diff options
Diffstat (limited to 'search-data-service-app/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java')
-rw-r--r-- | search-data-service-app/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java | 1410 |
1 files changed, 1410 insertions, 0 deletions
diff --git a/search-data-service-app/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java b/search-data-service-app/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java new file mode 100644 index 0000000..a4af160 --- /dev/null +++ b/search-data-service-app/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java @@ -0,0 +1,1410 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao; + +import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +import com.fasterxml.jackson.annotation.JsonInclude.Include; +import com.fasterxml.jackson.core.JsonParseException; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response.Status; +import javax.ws.rs.core.UriBuilder; +import org.eclipse.jetty.http.HttpStatus; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; +import org.onap.aai.cl.api.LogFields; +import org.onap.aai.cl.api.LogLine; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.cl.mdc.MdcContext; +import org.onap.aai.cl.mdc.MdcOverride; +import org.onap.aai.sa.rest.AnalysisConfiguration; +import org.onap.aai.sa.rest.ApiUtils; +import org.onap.aai.sa.rest.BulkRequest; +import org.onap.aai.sa.rest.BulkRequest.OperationType; +import org.onap.aai.sa.rest.DocumentSchema; +import org.onap.aai.sa.rest.SettingConfiguration; +import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig; +import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException; +import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException.ErrorMessage; +import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult; +import org.onap.aai.sa.searchdbabstraction.entity.AggregationResults; +import org.onap.aai.sa.searchdbabstraction.entity.Document; +import org.onap.aai.sa.searchdbabstraction.entity.DocumentOperationResult; +import org.onap.aai.sa.searchdbabstraction.entity.ErrorResult; +import org.onap.aai.sa.searchdbabstraction.entity.OperationResult; +import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder; +import org.onap.aai.sa.searchdbabstraction.entity.OperationResultBuilder.Type; +import org.onap.aai.sa.searchdbabstraction.entity.SearchHit; +import org.onap.aai.sa.searchdbabstraction.entity.SearchHits; +import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult; +import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit; +import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits; +import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs; +import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil; +import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil; +import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator; +import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants; + +/** + * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface. + */ +public class ElasticSearchHttpController implements DocumentStoreInterface { + + private static ElasticSearchHttpController instance = null; + + private static final Logger logger = + LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName()); + private static final Logger metricsLogger = + LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName()); + + private static final String URL_QUERY_VERSION = "version="; + + private static final String JSON_ATTR_VERSION = "_version"; + private static final String JSON_ATTR_ERROR = "error"; + private static final String JSON_ATTR_REASON = "reason"; + + private static final String DEFAULT_TYPE = "default"; + + private static final String MSG_RESOURCE_MISSING = "Specified resource does not exist: "; + private static final String MSG_RESPONSE_CODE = "Response Code : "; + private static final String MSG_INVALID_DOCUMENT_URL = "Invalid document URL: "; + + private static final String FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE = "Failed to parse Elastic Search response."; + + private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE = + "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n"; + private static final String BULK_CREATE_WITH_INDEX_TEMPLATE = + "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n"; + private static final String BULK_IMPORT_INDEX_TEMPLATE = + "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n"; + private static final String BULK_DELETE_TEMPLATE = + "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n"; + public final static String APPLICATION_XND_JSON_TYPE = new MediaType("application", "x-ndjson").toString(); + + private final ElasticSearchConfig config; + + protected AnalysisConfiguration analysisConfig; + protected SettingConfiguration settingConfig; + + public ElasticSearchHttpController(ElasticSearchConfig config) { + this.config = config; + analysisConfig = new AnalysisConfiguration(); + settingConfig = new SettingConfiguration(); + + String rootUrl = null; + try { + if ("https".equals(config.getUriScheme())) { + new ElasticSearchHttpsController(config); + } + rootUrl = buildUrl(createUriBuilder("")).toString(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, rootUrl); + checkConnection(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, rootUrl); + } catch (Exception e) { + logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, rootUrl, e.getMessage()); + } + } + + public static ElasticSearchHttpController getInstance() { + synchronized (ElasticSearchHttpController.class) { + if (instance == null) { + Properties properties = new Properties(); + File file = new File(SearchDbConstants.ES_CONFIG_FILE); + try { + properties.load(new FileInputStream(file)); + } catch (Exception e) { + logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance", + e.getLocalizedMessage()); + } + + ElasticSearchConfig config = new ElasticSearchConfig(properties); + instance = new ElasticSearchHttpController(config); + } + } + + return instance; + } + + public AnalysisConfiguration getAnalysisConfig() { + return analysisConfig; + } + + public ElasticSearchConfig getElasticSearchConfig() { + return config; + } + + @Override + public OperationResult createIndex(String index, DocumentSchema documentSchema) { + try { + // Submit the request to ElasticSearch to create the index using a default document type. + OperationResult result = createTable(index, DEFAULT_TYPE, analysisConfig, + DocumentSchemaUtil.generateDocumentMappings(documentSchema), settingConfig); + + // ElasticSearch will return us a 200 code on success when we + // want to report a 201, so translate the result here. + if (result.getResultCode() == Status.OK.getStatusCode()) { + result.setResultCode(Status.CREATED.getStatusCode()); + } + + if (isSuccess(result)) { + result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); + } + return result; + } catch (DocumentStoreOperationException | IOException e) { + return new OperationResultBuilder().useDefaults() + .failureCause("Document store operation failure. Cause: " + e.getMessage()).build(); + } + } + + @Override + public OperationResult createDynamicIndex(String index, String dynamicSchema) { + try { + OperationResult result = createTable(index, dynamicSchema); + + // ElasticSearch will return us a 200 code on success when we + // want to report a 201, so translate the result here. + if (result.getResultCode() == Status.OK.getStatusCode()) { + result.setResultCode(Status.CREATED.getStatusCode()); + } + if (isSuccess(result)) { + result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); + } + return result; + } catch (DocumentStoreOperationException e) { + return new OperationResultBuilder().useDefaults() + .failureCause("Document store operation failure. Cause: " + e.getMessage()).build(); + } + } + + @Override + public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.DELETE); + OperationResult opResult = handleResponse(conn); + logMetricsInfo(override, SearchDbMsgs.DELETE_INDEX_TIME, opResult, indexName); + shutdownConnection(conn); + + return opResult; + } + + // @Override + protected OperationResult createTable(String indexName, String typeName, AnalysisConfiguration ac, + String indexMappings, SettingConfiguration sc) throws DocumentStoreOperationException { + if (ac.getEsIndexSettings() == null) { + logger.debug("No analysis settings provided."); + } + + if (indexMappings == null) { + logger.debug("No mappings provided."); + } + + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); + + StringBuilder sb = new StringBuilder(128); + sb.append("{ \"settings\" : "); + sb.append(sc.getSettingsWithAnalysis(ac)); + sb.append(","); + + sb.append("\"mappings\" : {"); + sb.append("\"" + typeName + "\" :"); + sb.append(indexMappings); + sb.append("}}"); + + try { + attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString())); + } catch (IOException e) { + logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); + throw new DocumentStoreOperationException(e.getMessage(), e); + } + + logger.debug("Request content: " + sb); + + OperationResult opResult = handleResponse(conn); + shutdownConnection(conn); + logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, opResult, indexName); + + return opResult; + } + + /** + * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the + * index. + * + * @param indexName - The name of the index to be created + * @param settingsAndMappings - The actual JSON object that will define the index + * @return - The operation result of writing into Elasticsearch + * @throws DocumentStoreOperationException + */ + protected OperationResult createTable(String indexName, String settingsAndMappings) + throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.PUT); + try { + attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings)); + } catch (IOException e) { + logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); + throw new DocumentStoreOperationException(e.getMessage()); + } + + OperationResult result = handleResponse(conn); + logMetricsInfo(override, SearchDbMsgs.CREATE_INDEX_TIME, result, indexName); + + return result; + } + + @Override + public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + + if (!allowImplicitIndexCreation) { + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if (!isSuccess(indexExistsResult)) { + String resultMsg = "Document Index '" + indexName + "' does not exist."; + return (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT).status(Status.NOT_FOUND) + .result(resultMsg).failureCause(resultMsg).build(); + } + } + + if (document.getId() == null || document.getId().isEmpty()) { + return createDocumentWithoutId(indexName, document); + } else { + return createDocumentWithId(indexName, document); + } + } + + @Override + public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + if (!allowImplicitIndexCreation) { + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if (!isSuccess(indexExistsResult)) { + DocumentOperationResult opResult = new DocumentOperationResult(); + opResult.setResultCode(Status.NOT_FOUND.getStatusCode()); + String resultMsg = "Document Index '" + indexName + "' does not exist."; + opResult.setResult(resultMsg); + opResult.setFailureCause(resultMsg); + return opResult; + } + } + + MdcOverride override = getStartTime(new MdcOverride()); + + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); + + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); + attachDocument(conn, document); + + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.UPDATE_DOCUMENT_TIME, opResult, indexName, document.getId()); + + shutdownConnection(conn); + + return opResult; + } + + @Override + public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId()) + .replaceQuery(URL_QUERY_VERSION + document.getVersion())); + + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, HttpMethod.DELETE); + + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + // supress the etag and url in response for delete as they are not required + if (opResult.getDocument() != null) { + opResult.getDocument().setEtag(null); + opResult.getDocument().setUrl(null); + } + + logMetricsInfo(override, SearchDbMsgs.DELETE_DOCUMENT_TIME, opResult, indexName, document.getId()); + + shutdownConnection(conn); + + return opResult; + } + + @Override + public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + final UriBuilder uriBuilder = createUriBuilder(indexName, DEFAULT_TYPE, document.getId()); + if (document.getVersion() != null) { + uriBuilder.replaceQuery(URL_QUERY_VERSION + document.getVersion()); + } + + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(uriBuilder), "GET"); + + DocumentOperationResult opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, document.getId()); + + shutdownConnection(conn); + + return opResult; + } + + @Override + public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, "_search").replaceQuery(queryString)); + + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, "GET"); + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, queryString); + + return opResult; + } + + @Override + public SearchOperationResult searchWithPayload(String indexName, String query) + throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + logger.debug("Querying index: " + indexName + " with query string: " + query); + } + final URL url = buildUrl(createUriBuilder(indexName, "_search")); + + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachContent(conn, query); + logger.debug("Request body = Elasticsearch query = " + query); + + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSearchResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); + + shutdownConnection(conn); + + return opResult; + } + + @Override + public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) + throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); + } + + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName, "_suggest")), HttpMethod.POST); + attachContent(conn, query); + + logger.debug("Request body = Elasticsearch query = " + query); + + SearchOperationResult opResult = getSearchOperationResult(conn); + buildSuggestResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.QUERY_DOCUMENT_TIME, opResult, indexName, query); + + shutdownConnection(conn); + + return opResult; + } + + @Override + public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { + if (logger.isDebugEnabled()) { + StringBuilder dbgString = new StringBuilder("ESController: performBulkOperations - Operations: "); + + for (BulkRequest request : requests) { + dbgString.append("[").append(request).append("] "); + } + + logger.debug(dbgString.toString()); + } + + MdcOverride override = getStartTime(new MdcOverride()); + + // Parse the supplied set of operations. + // Iterate over the list of operations which we were provided and + // translate them into a format that ElasticSearh understands. + int opCount = 0; + StringBuilder esOperationSet = new StringBuilder(128); + List<ElasticSearchResultItem> rejected = new ArrayList<>(); + for (BulkRequest request : requests) { + + // Convert the request to the syntax ElasticSearch likes. + if (buildEsOperation(request, esOperationSet, rejected)) { + opCount++; + } + } + + ElasticSearchBulkOperationResult opResult = null; + if (opCount > 0) { + HttpURLConnection conn; + try { + conn = (HttpURLConnection) buildUrl(createUriBuilder("_bulk")).openConnection(); + conn.setRequestMethod(HttpMethod.PUT); + conn.setDoOutput(true); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_XND_JSON_TYPE); + if(config.useAuth()){ + conn.setRequestProperty("Authorization", config.getAuthValue()); + } + conn.setRequestProperty("Connection", "Close"); + + } catch (IOException e) { + + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } + + throw new DocumentStoreOperationException( + "Failed to open connection to document store. Cause: " + e.getMessage(), e); + } + + StringBuilder bulkResult = new StringBuilder(128); + try { + // Create an output stream to write our request to. + OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); + + if (logger.isDebugEnabled()) { + logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); + logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); + } + + // Write the resulting request string to our output stream. (this sends the request to ES?) + out.write(esOperationSet.toString()); + out.close(); + + // Open an input stream on our connection in order to read back the results. + InputStream is = conn.getInputStream(); + InputStreamReader inputstreamreader = new InputStreamReader(is); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); + + // Read the contents of the input stream into our result string... + String esResponseString = null; + + while ((esResponseString = bufferedreader.readLine()) != null) { + bulkResult.append(esResponseString).append("\n"); + } + + } catch (IOException e) { + + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + logger.debug(sw.toString()); + } + + throw new DocumentStoreOperationException( + "Failure interacting with document store. Cause: " + e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); + } + + // ...and marshal the resulting string into a Java object. + try { + opResult = marshallEsBulkResult(bulkResult.toString()); + + } catch (IOException e) { + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } + + throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), + e); + } + } + + OperationResult result = new OperationResultBuilder() // + .resultCode(HttpStatus.MULTI_STATUS_207) // + .result(buildGenericBulkResultSet(opResult, rejected)) // + .build(); + + // In the success case we don't want the entire result string to be dumped into the metrics log, so concatenate + // it. + String resultStringForMetricsLog = result.getResult(); + if (isSuccess(result)) { + resultStringForMetricsLog = + resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; + } + + metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, + new LogFields() // + .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), + override); + + return result; + } + + + /** + * This method queryies ElasticSearch to determine if the supplied index is present in the document store. + * + * @param indexName - The index to look for. + * @return - An operation result indicating the success or failure of the check. + * @throws DocumentStoreOperationException + */ + private OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder(indexName)), HttpMethod.HEAD); + int resultCode; + try { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); + } + logger.debug(MSG_RESPONSE_CODE + resultCode); + + OperationResult opResult = new OperationResultBuilder().useDefaults().resultCode(resultCode).build(); + logMetricsInfo(override, SearchDbMsgs.CHECK_INDEX_TIME, opResult, indexName); + shutdownConnection(conn); + + return opResult; + } + + private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + // check if the document already exists + DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); + + if (opResult.getResultCode() != Status.NOT_FOUND.getStatusCode()) { + if (opResult.getResultCode() == Status.CONFLICT.getStatusCode()) { + opResult.setFailureCause("A document with the same id already exists."); + } else { + opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + } + opResult.setResultCode(Status.CONFLICT.getStatusCode()); + return opResult; + } + + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, document.getId())); + MdcOverride override = getStartTime(new MdcOverride()); + + HttpURLConnection conn = createConnection(url, HttpMethod.PUT); + attachDocument(conn, document); + + opResult = getOperationResult(conn); + buildDocumentResult(opResult, indexName); + + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, opResult, indexName); + + shutdownConnection(conn); + + return opResult; + } + + private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + final URL url = buildUrl(createUriBuilder(indexName, DEFAULT_TYPE)); + + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = createConnection(url, HttpMethod.POST); + attachDocument(conn, document); + + DocumentOperationResult response = getOperationResult(conn); + buildDocumentResult(response, indexName); + + logMetricsInfo(override, SearchDbMsgs.CREATE_DOCUMENT_TIME, response, indexName); + + shutdownConnection(conn); + + return response; + } + + private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) + throws DocumentStoreOperationException { + conn.setRequestProperty("Connection", "Close"); + attachContent(conn, doc.getContentInJson()); + } + + private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + throws DocumentStoreOperationException { + MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = + createConnection(buildUrl(createUriBuilder(indexName, DEFAULT_TYPE, docId)), HttpMethod.HEAD); + int resultCode; + try { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); + } + + logger.debug(MSG_RESPONSE_CODE + resultCode); + + DocumentOperationResult opResult = (DocumentOperationResult) new OperationResultBuilder(Type.DOCUMENT) + .useDefaults().resultCode(resultCode).build(); + + logMetricsInfo(override, SearchDbMsgs.GET_DOCUMENT_TIME, opResult, indexName, docId); + shutdownConnection(conn); + + return opResult; + } + + private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException { + OutputStream outputStream = null; + OutputStreamWriter out = null; + + try { + outputStream = conn.getOutputStream(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get connection output stream.", e); + } + + out = new OutputStreamWriter(outputStream); + + try { + out.write(content); + out.close(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to write to the output stream.", e); + } + } + + private HttpURLConnection initializeConnection(URL url) throws DocumentStoreOperationException { + HttpURLConnection conn = null; + try { + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestProperty(CONTENT_TYPE, APPLICATION_JSON); + conn.setDoOutput(true); + if (config.useAuth()) { + conn.setRequestProperty("Authorization", config.getAuthValue()); + } + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e); + } + + return conn; + } + + private OperationResult handleResponse(HttpURLConnection conn) throws DocumentStoreOperationException { + return handleResponse(conn, new OperationResultBuilder().useDefaults()); + } + + private OperationResult handleResponse(HttpURLConnection conn, OperationResultBuilder rb) + throws DocumentStoreOperationException { + int resultCode; + + try { + resultCode = conn.getResponseCode(); + } catch (IOException ex) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.NO_RESPONSE_CODE, ex); + } + + logger.debug(MSG_RESPONSE_CODE + resultCode); + + InputStream inputStream = null; + + if (!ApiUtils.isSuccessStatusCode(resultCode)) { + inputStream = conn.getErrorStream(); + } else { + try { + inputStream = conn.getInputStream(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get the response input stream.", e); + } + } + + InputStreamReader inputstreamreader = new InputStreamReader(inputStream); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); + + StringBuilder result = new StringBuilder(128); + String string = null; + + try { + while ((string = bufferedreader.readLine()) != null) { + result.append(string).append("\n"); + } + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed getting the response body payload.", e); + } + + if (resultCode == Status.CONFLICT.getStatusCode()) { + rb.resultCode(Status.PRECONDITION_FAILED.getStatusCode()); + } else { + rb.resultCode(resultCode); + } + if (logger.isDebugEnabled()) { + logger.debug("Raw result string from ElasticSearch = " + result.toString()); + } + rb.result(result.toString()); + rb.resultVersion(extractVersion(result.toString())); + return rb.build(); + } + + private String extractVersion(String result) { + JSONParser parser = new JSONParser(); + String version = null; + try { + JSONObject root = (JSONObject) parser.parse(result); + if (root.get(JSON_ATTR_VERSION) != null) { + version = root.get(JSON_ATTR_VERSION).toString(); + } + } catch (ParseException e) { + // Not all responses from ElasticSearch include a version, so + // if we don't get one back, just return an empty string rather + // than trigger a false failure. + version = ""; + } + return version; + } + + /** + * This convenience method gets the current system time and stores it in an attribute in the supplied + * {@link MdcOverride} object so that it can be used later by the metrics logger. + * + * @param override - The {@link MdcOverride} object to update. + * @return - The supplied {@link MdcOverride} object. + */ + private MdcOverride getStartTime(MdcOverride override) { + + // Grab the current time... + long startTimeInMs = System.currentTimeMillis(); + + // ...and add it as an attribute to the supplied MDC Override object. + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + + // Return the MdcOverride object that we were passed. + // This looks odd, but it allows us to do stuff like: + // + // MdcOverride ov = getStartTime(new MdcOverride()) + // + // which is quite handy, but also allows us to pass in an existing + // MdcOverride object which already has some attributes set. + return override; + } + + private boolean isSuccess(OperationResult result) { + return ApiUtils.isSuccessStatusCode(result.getResultCode()); + } + + private UriBuilder createUriBuilder(String path, String... paths) { + UriBuilder builder = UriBuilder.fromPath(path); + for (String other : paths) { + builder.path(other); + } + builder.host(config.getIpAddress()); + String port = Optional.ofNullable(config.getHttpPort()).orElse("0"); + builder.port(Integer.valueOf(port)); + builder.scheme(config.getUriScheme()); + return builder; + } + + private URL buildUrl(UriBuilder builder) throws DocumentStoreOperationException { + try { + return builder.build().toURL(); + } catch (MalformedURLException e) { + logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "buildUrl", e.getLocalizedMessage()); + throw new DocumentStoreOperationException("Error building a URL with " + builder.toString(), e); + } + } + + private HttpURLConnection createConnection(final URL url, final String method) + throws DocumentStoreOperationException { + HttpURLConnection conn = initializeConnection(url); + try { + logger.debug("\nSending '" + method + "' request to URL : " + conn.getURL()); + conn.setRequestMethod(method); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException(ErrorMessage.SET_REQUEST_METHOD_FAILED, e, method); + } + return conn; + } + + private OperationResult checkConnection() throws IOException, DocumentStoreOperationException { + HttpURLConnection conn = createConnection(buildUrl(createUriBuilder("_cluster/health")), HttpMethod.GET); + int resultCode = conn.getResponseCode(); + logger.debug("getClusterHealth() response Code : " + resultCode); + shutdownConnection(conn); + return new OperationResultBuilder().resultCode(resultCode).build(); + } + + private void shutdownConnection(HttpURLConnection connection) { + if (connection == null) { + return; + } + + final String methodName = "shutdownConnection"; + InputStream inputstream = null; + OutputStream outputstream = null; + + try { + inputstream = connection.getInputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (inputstream != null) { + try { + inputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } + } + } + + try { + outputstream = connection.getOutputStream(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } finally { + if (outputstream != null) { + try { + outputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, methodName, e.getLocalizedMessage()); + } + } + } + + connection.disconnect(); + } + + /** + * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch. + * + * @param request - The request to be performed. + * @param sb - The string builder to append the json data to + * @throws DocumentStoreOperationException + */ + private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails) + throws DocumentStoreOperationException { + + boolean retVal = true; + // What kind of operation are we performing? + switch (request.getOperationType()) { + + // Create a new document. + case CREATE: + + // Make sure that we were supplied a document payload. + if (request.getOperation().getDocument() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) { + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the create. + if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) { + + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(), + request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // If we were supplied an id for the new document, then + // include it in the bulk operation to Elastic Search + if (request.getId() == null) { + + sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE)); + + // Otherwise, we just leave that parameter off and ElasticSearch + // will generate one for us. + } else { + sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, + request.getId())); + } + + try { + // Append the document that we want to create. + sb.append(request.getOperation().getDocument().toJson()).append("\n"); + } catch (JsonProcessingException e) { + throw new DocumentStoreOperationException("Failure parsing document to json", e); + } + + break; + + // Update an existing document. + case UPDATE: + + // Make sure that we were supplied a document payload. + if (request.getOperation().getDocument() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the update. + if (!indexExists(request.getIndex())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(), + request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the document we are trying to update actually + // exists before we try to perform the update. + if (!documentExists(request.getIndex(), request.getId())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(), + request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // It is mandatory that a version be supplied for an update operation, + // so validate that now. + if (request.getOperation().getMetaData().getEtag() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Generate the update request... + sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(), + request.getOperation().getMetaData().getEtag())); + + // ...and append the document that we want to update. + try { + sb.append(request.getOperation().getDocument().toJson()).append("\n"); + } catch (JsonProcessingException e) { + throw new DocumentStoreOperationException("Failure parsing document to json", e); + } + break; + + // Delete an existing document. + case DELETE: + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_INVALID_DOCUMENT_URL + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the delete. + if (!indexExists(request.getIndex())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(), + request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the document we are trying to update actually + // exists before we try to perform the delete. + if (!documentExists(request.getIndex(), request.getId())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + MSG_RESOURCE_MISSING + request.getOperation().getMetaData().getUrl(), request.getIndex(), + request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // It is mandatory that a version be supplied for a delete operation, + // so validate that now. + if (request.getOperation().getMetaData().getEtag() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Generate the delete request. + sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(), + request.getOperation().getMetaData().getEtag())); + break; + default: + } + + return retVal; + } + + private boolean indexExists(String index) throws DocumentStoreOperationException { + return isSuccess(checkIndexExistence(index)); + } + + private boolean documentExists(String index, String id) throws DocumentStoreOperationException { + return isSuccess(checkDocumentExistence(index, id)); + } + + /** + * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the + * document store. + * + * @param rejectReason - A message describing why the operation was rejected. + * @param anId - The identifier associated with the document being acted on. + * @param statusCode - An HTTP status code. + * @return - A result set item. + */ + private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index, + String anId, int statusCode, String originalUrl) { + + ElasticSearchError err = new ElasticSearchError(); + err.setReason(rejectReason); + + ElasticSearchOperationStatus op = new ElasticSearchOperationStatus(); + op.setIndex(index); + op.setId(anId); + op.setStatus(statusCode); + op.setError(err); + op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl); + + ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem(); + + switch (opType) { + case CREATE: + rejectionResult.setCreate(op); + break; + case UPDATE: + rejectionResult.setIndex(op); + break; + case DELETE: + rejectionResult.setDelete(op); + break; + default: + } + + return rejectionResult; + } + + /** + * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and + * marshals it into a Java object. + * + * @param jsonResult - The bulk operations response returned from ElasticSearch. + * @return - The marshalled response. + * @throws JsonParseException + * @throws JsonMappingException + * @throws IOException + */ + private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException { + if (jsonResult != null) { + if (logger.isDebugEnabled()) { + logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", "")); + } + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(Include.NON_EMPTY); + + return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class); + } + + return null; + } + + /** + * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload. + * + * @param esResult - ElasticSearch bulk operations response. + * @return - A generic result set. + */ + private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult, + List<ElasticSearchResultItem> rejectedOps) { + int totalOps = 0; + int totalSuccess = 0; + int totalFails = 0; + + if (logger.isDebugEnabled()) { + + logger.debug("ESController: Build generic result set. ES Results: " + + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString()); + } + + // Build a combined list of result items from the results returned + // from ElasticSearch and the list of operations that we rejected + // without sending to ElasticSearch. + List<ElasticSearchResultItem> combinedResults = new ArrayList<>(); + if (esResult != null) { + combinedResults.addAll(Arrays.asList(esResult.getItems())); + } + combinedResults.addAll(rejectedOps); + + // Iterate over the individual results in the resulting result set. + StringBuilder resultsBuilder = new StringBuilder(); + AtomicBoolean firstItem = new AtomicBoolean(true); + for (ElasticSearchResultItem item : combinedResults) { + + // Increment the operation counts. + totalOps++; + if (ApiUtils.isSuccessStatusCode(item.operationStatus().getStatus())) { + totalSuccess++; + } else { + totalFails++; + } + + // Prepend a comma to our response string unless this it the + // first result in the set. + if (!firstItem.compareAndSet(true, false)) { + resultsBuilder.append(", "); + } + + // Append the current result as a generic json structure. + resultsBuilder.append(item.toJson()); + } + + return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", " + + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}"; + } + + private DocumentOperationResult getOperationResult(HttpURLConnection conn) throws DocumentStoreOperationException { + return (DocumentOperationResult) handleResponse(conn, new OperationResultBuilder(Type.DOCUMENT).useDefaults()); + } + + private SearchOperationResult getSearchOperationResult(HttpURLConnection conn) + throws DocumentStoreOperationException { + return (SearchOperationResult) handleResponse(conn, new OperationResultBuilder(Type.SEARCH).useDefaults()); + } + + private void buildDocumentResult(DocumentOperationResult result, String index) + throws DocumentStoreOperationException { + + JSONParser parser = new JSONParser(); + JSONObject root; + try { + root = (JSONObject) parser.parse(result.getResult()); + if (isSuccess(result)) { + // Success response object + Document doc = new Document(); + doc.setEtag(result.getResultVersion()); + doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString())); + + doc.setContent((JSONObject) root.get("_source")); + result.setDocument(doc); + + } else { + // Error response object + JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR); + if (error != null) { + result.setError( + new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString())); + } + + } + } catch (Exception e) { + throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult()); + } + } + + private String buildDocumentResponseUrl(String index, String id) { + return ApiUtils.buildDocumentUri(index, id); + } + + private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { + JSONParser parser = new JSONParser(); + JSONObject root; + + try { + root = (JSONObject) parser.parse(result.getResult()); + if (isSuccess(result)) { + JSONObject hits = (JSONObject) root.get("hits"); + JSONArray hitArray = (JSONArray) hits.get("hits"); + SearchHits searchHits = new SearchHits(); + searchHits.setTotalHits(hits.get("total").toString()); + ArrayList<SearchHit> searchHitArray = new ArrayList<>(); + + for (int i = 0; i < hitArray.size(); i++) { + JSONObject hit = (JSONObject) hitArray.get(i); + SearchHit searchHit = new SearchHit(); + searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : ""); + Document doc = new Document(); + if (hit.get(JSON_ATTR_VERSION) != null) { + doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : ""); + } + + doc.setUrl( + buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : "")); + doc.setContent((JSONObject) hit.get("_source")); + searchHit.setDocument(doc); + searchHitArray.add(searchHit); + } + searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()])); + result.setSearchResult(searchHits); + + JSONObject aggregations = (JSONObject) root.get("aggregations"); + if (aggregations != null) { + AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations); + AggregationResults aggs = new AggregationResults(); + aggs.setAggregations(aggResults); + result.setAggregationResult(aggs); + } + + // success + } else { + JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR); + if (error != null) { + result.setError( + new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString())); + } + } + } catch (Exception e) { + throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult()); + } + } + + private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { + JSONParser parser = new JSONParser(); + JSONObject root; + try { + root = (JSONObject) parser.parse(result.getResult()); + if (isSuccess(result)) { + JSONArray hitArray = (JSONArray) root.get("suggest-vnf"); + JSONObject hitdata = (JSONObject) hitArray.get(0); + JSONArray optionsArray = (JSONArray) hitdata.get("options"); + SuggestHits suggestHits = new SuggestHits(); + suggestHits.setTotalHits(String.valueOf(optionsArray.size())); + + ArrayList<SuggestHit> suggestHitArray = new ArrayList<>(); + + for (int i = 0; i < optionsArray.size(); i++) { + JSONObject hit = (JSONObject) optionsArray.get(i); + + SuggestHit suggestHit = new SuggestHit(); + suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : ""); + suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : ""); + Document doc = new Document(); + if (hit.get(JSON_ATTR_VERSION) != null) { + doc.setEtag((hit.get(JSON_ATTR_VERSION) != null) ? hit.get(JSON_ATTR_VERSION).toString() : ""); + } + doc.setUrl( + buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : "")); + + doc.setContent((JSONObject) hit.get("payload")); + suggestHit.setDocument(doc); + suggestHitArray.add(suggestHit); + } + suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()])); + result.setSuggestResult(suggestHits); + + JSONObject aggregations = (JSONObject) root.get("aggregations"); + if (aggregations != null) { + AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations); + AggregationResults aggs = new AggregationResults(); + aggs.setAggregations(aggResults); + result.setAggregationResult(aggs); + } + + // success + } else { + JSONObject error = (JSONObject) root.get(JSON_ATTR_ERROR); + if (error != null) { + result.setError( + new ErrorResult(error.get("type").toString(), error.get(JSON_ATTR_REASON).toString())); + } + } + } catch (Exception e) { + throw new DocumentStoreOperationException(FAILED_TO_PARSE_ELASTIC_SEARCH_RESPONSE + result.getResult()); + } + } + + /** + * Record the timing of the operation in the metrics log. + * + */ + private void logMetricsInfo(MdcOverride override, SearchDbMsgs message, OperationResult operationResult, + String... args) { + metricsLogger.info(message, + new LogFields() // + .setField(LogLine.DefinedFields.RESPONSE_CODE, operationResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, operationResult.getResult()) + .setField(LogLine.DefinedFields.SERVER_IP, "ElasticHost-"+config.getIpAddress()), + override, args); + } +} |