diff options
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java | 2800 |
2 files changed, 1332 insertions, 1473 deletions
@@ -1,8 +1,9 @@ +/src/main/java-gen/ /target/ /bin/ .project .settings/ -src/main/java-gen/ -logs/ +.classpath .idea/ +logs/ debug-logs/ diff --git a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java index 98a254c..841f477 100644 --- a/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java +++ b/src/main/java/org/onap/aai/sa/searchdbabstraction/elasticsearch/dao/ElasticSearchHttpController.java @@ -18,22 +18,50 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + package org.onap.aai.sa.searchdbabstraction.elasticsearch.dao; -import com.google.common.base.Throwables; import com.fasterxml.jackson.annotation.JsonInclude.Include; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Throwables; +import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.ProtocolException; +import java.net.URL; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import org.onap.aai.cl.api.LogFields; +import org.onap.aai.cl.api.LogLine; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.cl.mdc.MdcContext; +import org.onap.aai.cl.mdc.MdcOverride; import org.onap.aai.sa.rest.AnalysisConfiguration; import org.onap.aai.sa.rest.ApiUtils; import org.onap.aai.sa.rest.BulkRequest; import org.onap.aai.sa.rest.BulkRequest.OperationType; +import org.onap.aai.sa.rest.DocumentSchema; import org.onap.aai.sa.searchdbabstraction.elasticsearch.config.ElasticSearchConfig; import org.onap.aai.sa.searchdbabstraction.elasticsearch.exception.DocumentStoreOperationException; import org.onap.aai.sa.searchdbabstraction.entity.AggregationResult; @@ -45,1776 +73,1606 @@ import org.onap.aai.sa.searchdbabstraction.entity.OperationResult; import org.onap.aai.sa.searchdbabstraction.entity.SearchHit; import org.onap.aai.sa.searchdbabstraction.entity.SearchHits; import org.onap.aai.sa.searchdbabstraction.entity.SearchOperationResult; +import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit; +import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits; import org.onap.aai.sa.searchdbabstraction.logging.SearchDbMsgs; import org.onap.aai.sa.searchdbabstraction.util.AggregationParsingUtil; import org.onap.aai.sa.searchdbabstraction.util.DocumentSchemaUtil; import org.onap.aai.sa.searchdbabstraction.util.ElasticSearchPayloadTranslator; import org.onap.aai.sa.searchdbabstraction.util.SearchDbConstants; -import org.onap.aai.cl.api.LogFields; -import org.onap.aai.cl.api.LogLine; -import org.onap.aai.cl.api.Logger; -import org.onap.aai.cl.eelf.LoggerFactory; -import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.cl.mdc.MdcOverride; -import org.onap.aai.sa.rest.DocumentSchema; -import org.onap.aai.sa.searchdbabstraction.entity.SuggestHit; -import org.onap.aai.sa.searchdbabstraction.entity.SuggestHits; - -import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.ProtocolException; -import java.net.URL; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicBoolean; - import org.springframework.http.HttpStatus; - - /** - * This class has the Elasticsearch implementation of the - * DB operations defined in DocumentStoreInterface. + * This class has the Elasticsearch implementation of the DB operations defined in DocumentStoreInterface. */ public class ElasticSearchHttpController implements DocumentStoreInterface { - private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE = - "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n"; - private static final String BULK_CREATE_WITH_INDEX_TEMPLATE = - "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n"; - private static final String BULK_IMPORT_INDEX_TEMPLATE = - "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n"; - private static final String BULK_DELETE_TEMPLATE = - "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n"; + private static ElasticSearchHttpController instance = null; - private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT = - "Internal Error: ElasticSearch operation fault occurred"; - private static final Logger logger = LoggerFactory.getInstance() - .getLogger(ElasticSearchHttpController.class.getName()); - private static final Logger metricsLogger = LoggerFactory.getInstance() - .getMetricsLogger(ElasticSearchHttpController.class.getName()); - private final ElasticSearchConfig config; + private static final Logger logger = + LoggerFactory.getInstance().getLogger(ElasticSearchHttpController.class.getName()); + private static final Logger metricsLogger = + LoggerFactory.getInstance().getMetricsLogger(ElasticSearchHttpController.class.getName()); - private static final String DEFAULT_TYPE = "default"; + private static final String BULK_CREATE_WITHOUT_INDEX_TEMPLATE = + "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\"} }\n"; + private static final String BULK_CREATE_WITH_INDEX_TEMPLATE = + "{\"create\":{\"_index\" : \"%s\", \"_type\" : \"%s\", \"_id\" : \"%s\" } }\n"; + private static final String BULK_IMPORT_INDEX_TEMPLATE = + "{\"index\":{\"_index\":\"%s\",\"_type\":\"%s\",\"_id\":\"%s\", \"_version\":\"%s\"}}\n"; + private static final String BULK_DELETE_TEMPLATE = + "{ \"delete\": { \"_index\": \"%s\", \"_type\": \"%s\", \"_id\": \"%s\", \"_version\":\"%s\"}}\n"; - private static ElasticSearchHttpController instance = null; + private static final String INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT = + "Internal Error: ElasticSearch operation fault occurred"; + private final ElasticSearchConfig config; - protected AnalysisConfiguration analysisConfig; + private static final String DEFAULT_TYPE = "default"; - public static ElasticSearchHttpController getInstance() { + protected AnalysisConfiguration analysisConfig; - synchronized (ElasticSearchHttpController.class) { + public static ElasticSearchHttpController getInstance() { - if (instance == null) { + synchronized (ElasticSearchHttpController.class) { - Properties properties = new Properties(); - File file = new File(SearchDbConstants.ES_CONFIG_FILE); - try { - properties.load(new FileInputStream(file)); - } catch (Exception e) { - logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, - "ElasticSearchHTTPController.getInstance", - e.getLocalizedMessage()); + if (instance == null) { + + Properties properties = new Properties(); + File file = new File(SearchDbConstants.ES_CONFIG_FILE); + try { + properties.load(new FileInputStream(file)); + } catch (Exception e) { + logger.error(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "ElasticSearchHTTPController.getInstance", + e.getLocalizedMessage()); + } + + ElasticSearchConfig config = new ElasticSearchConfig(properties); + instance = new ElasticSearchHttpController(config); + } } - ElasticSearchConfig config = new ElasticSearchConfig(properties); - instance = new ElasticSearchHttpController(config); - } + return instance; } - return instance; - } - - public ElasticSearchHttpController(ElasticSearchConfig config) { - this.config = config; - analysisConfig = new AnalysisConfiguration(); + public ElasticSearchHttpController(ElasticSearchConfig config) { + this.config = config; + analysisConfig = new AnalysisConfiguration(); - try { - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false)); - checkConnection(); - logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false)); - } catch (Exception e) { - logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, - getFullUrl("", false), e.getMessage()); + try { + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_ATTEMPT, getFullUrl("", false)); + checkConnection(); + logger.info(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_SUCCESS, getFullUrl("", false)); + } catch (Exception e) { + logger.error(SearchDbMsgs.ELASTIC_SEARCH_CONNECTION_FAILURE, null, e, getFullUrl("", false), + e.getMessage()); + } } - } - public AnalysisConfiguration getAnalysisConfig() { - return analysisConfig; - } + public AnalysisConfiguration getAnalysisConfig() { + return analysisConfig; + } - @Override - public OperationResult createIndex(String index, DocumentSchema documentSchema) { + @Override + public OperationResult createIndex(String index, DocumentSchema documentSchema) { - OperationResult result = new OperationResult(); - result.setResultCode(500); + OperationResult result = new OperationResult(); + result.setResultCode(500); - try { + try { - // Submit the request to ElasticSearch to create the index using a - // default document type. - result = createTable(index, - DEFAULT_TYPE, - analysisConfig.getEsIndexSettings(), - DocumentSchemaUtil.generateDocumentMappings(documentSchema)); + // Submit the request to ElasticSearch to create the index using a + // default document type. + result = createTable(index, DEFAULT_TYPE, analysisConfig.getEsIndexSettings(), + DocumentSchemaUtil.generateDocumentMappings(documentSchema)); - // ElasticSearch will return us a 200 code on success when we - // want to report a 201, so translate the result here. - result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode()); - if (isSuccess(result)) { - result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); - //result.setResult("{\"index\": \"" + index + ", \"type\": \"" + DEFAULT_TYPE + "\"}"); - } + // ElasticSearch will return us a 200 code on success when we + // want to report a 201, so translate the result here. + result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode()); + if (isSuccess(result)) { + result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); + } - } catch (DocumentStoreOperationException | IOException e) { + } catch (DocumentStoreOperationException | IOException e) { - result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); - } + result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); + } - return result; - } - - @Override - public OperationResult createDynamicIndex(String index, String dynamicSchema) { - OperationResult result = new OperationResult(); - result.setResultCode(500); - - try { - result = createTable(index, dynamicSchema); - - // ElasticSearch will return us a 200 code on success when we - // want to report a 201, so translate the result here. - result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode()); - if (isSuccess(result)) { - result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); - } - } catch (DocumentStoreOperationException e) { - result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); + return result; } - return result; - } + @Override + public OperationResult createDynamicIndex(String index, String dynamicSchema) { + OperationResult result = new OperationResult(); + result.setResultCode(500); + try { + result = createTable(index, dynamicSchema); + + // ElasticSearch will return us a 200 code on success when we + // want to report a 201, so translate the result here. + result.setResultCode((result.getResultCode() == 200) ? 201 : result.getResultCode()); + if (isSuccess(result)) { + result.setResult("{\"url\": \"" + ApiUtils.buildIndexUri(index) + "\"}"); + } + } catch (DocumentStoreOperationException e) { + result.setFailureCause("Document store operation failure. Cause: " + e.getMessage()); + } - @Override - public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException { - - //Initialize operation result with a failure codes / fault string - OperationResult opResult = new OperationResult(); - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + return result; + } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); + @Override + public OperationResult deleteIndex(String indexName) throws DocumentStoreOperationException { - logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL()); + // Initialize operation result with a failure codes / fault string + OperationResult opResult = new OperationResult(); + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - handleResponse(conn, opResult); + String fullUrl = getFullUrl("/" + indexName + "/", false); + HttpURLConnection conn = initializeConnection(fullUrl); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.DELETE_INDEX_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName); + logger.debug("\nSending 'DELETE' request to URL : " + conn.getURL()); - shutdownConnection(conn); + try { + conn.setRequestMethod("DELETE"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); + } - return opResult; - } + handleResponse(conn, opResult); + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.DELETE_INDEX_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName); - private OperationResult checkConnection() throws Exception { + shutdownConnection(conn); - String fullUrl = getFullUrl("/_cluster/health", false); - URL url = null; - HttpURLConnection conn = null; + return opResult; + } - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("GET"); - conn.setDoOutput(true); - logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url); - int resultCode = conn.getResponseCode(); - logger.debug("getClusterHealth() response Code : " + resultCode); - OperationResult opResult = new OperationResult(); - opResult.setResultCode(resultCode); + private OperationResult checkConnection() throws Exception { - shutdownConnection(conn); + String fullUrl = getFullUrl("/_cluster/health", false); + URL url = null; + HttpURLConnection conn = null; - return opResult; - } + url = new URL(fullUrl); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("GET"); + conn.setDoOutput(true); + logger.debug("getClusterHealth(), Sending 'GET' request to URL : " + url); - private String getFullUrl(String resourceUrl, boolean isSecure) { + int resultCode = conn.getResponseCode(); + logger.debug("getClusterHealth() response Code : " + resultCode); + OperationResult opResult = new OperationResult(); + opResult.setResultCode(resultCode); - final String host = config.getIpAddress(); - final String port = config.getHttpPort(); + shutdownConnection(conn); - if (isSecure) { - return String.format("https://%s:%s%s", host, port, resourceUrl); - } else { - return String.format("http://%s:%s%s", host, port, resourceUrl); + return opResult; } - } - private void shutdownConnection(HttpURLConnection connection) { - if (connection == null) { - return; + private String getFullUrl(String resourceUrl, boolean isSecure) { + + final String host = config.getIpAddress(); + final String port = config.getHttpPort(); + + if (isSecure) { + return String.format("https://%s:%s%s", host, port, resourceUrl); + } else { + return String.format("http://%s:%s%s", host, port, resourceUrl); + } } - InputStream inputstream = null; - OutputStream outputstream = null; + private void shutdownConnection(HttpURLConnection connection) { + if (connection == null) { + return; + } + + InputStream inputstream = null; + OutputStream outputstream = null; - try { - inputstream = connection.getInputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage()); - } finally { - if (inputstream != null) { try { - inputstream.close(); + inputstream = connection.getInputStream(); } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", - e.getLocalizedMessage()); + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage()); + } finally { + if (inputstream != null) { + try { + inputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", + e.getLocalizedMessage()); + } + } } - } - } - try { - outputstream = connection.getOutputStream(); - } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage()); - } finally { - if (outputstream != null) { try { - outputstream.close(); + outputstream = connection.getOutputStream(); } catch (IOException e) { - logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", - e.getLocalizedMessage()); + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", e.getLocalizedMessage()); + } finally { + if (outputstream != null) { + try { + outputstream.close(); + } catch (IOException e) { + logger.debug(SearchDbMsgs.EXCEPTION_DURING_METHOD_CALL, "shutdownConnection", + e.getLocalizedMessage()); + } + } } - } + + connection.disconnect(); } - connection.disconnect(); - } + // @Override + protected OperationResult createTable(String indexName, String typeName, String indexSettings, String indexMappings) + throws DocumentStoreOperationException { - //@Override - protected OperationResult createTable(String indexName, String typeName, - String indexSettings, String indexMappings) - throws DocumentStoreOperationException { + if (indexSettings == null) { + logger.debug("No settings provided."); + } - if (indexSettings == null) { - logger.debug("No settings provided."); - } + if (indexMappings == null) { + logger.debug("No mappings provided."); + } - if (indexMappings == null) { - logger.debug("No mappings provided."); - } + OperationResult opResult = new OperationResult(); - OperationResult opResult = new OperationResult(); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + String fullUrl = getFullUrl("/" + indexName + "/", false); + HttpURLConnection conn = initializeConnection(fullUrl); - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); + try { + conn.setRequestMethod("PUT"); + conn.setRequestProperty("Content-Type", "application/json"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + } - try { - conn.setRequestMethod("PUT"); - conn.setRequestProperty("Content-Type", "application/json"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); - } + StringBuilder sb = new StringBuilder(128); + sb.append("{ \"settings\" : "); + sb.append(indexSettings); + sb.append(","); - StringBuilder sb = new StringBuilder(128); - sb.append("{ \"settings\" : "); - sb.append(indexSettings); - sb.append(","); - - sb.append("\"mappings\" : {"); - sb.append("\"" + typeName + "\" :"); - sb.append(indexMappings); - sb.append("}}"); - - try { - attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString())); - } catch(IOException e) { - logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); - throw new DocumentStoreOperationException(e.getMessage(), e); - } + sb.append("\"mappings\" : {"); + sb.append("\"" + typeName + "\" :"); + sb.append(indexMappings); + sb.append("}}"); - logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL()); - logger.debug("Request content: " + sb.toString()); - - handleResponse(conn, opResult); - - shutdownConnection(conn); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, - indexName); - - return opResult; - } - - /** - * Will send the passed in JSON payload to Elasticsearch using the - * provided index name in an attempt to create the index. - * - * @param indexName - The name of the index to be created - * @param settingsAndMappings - The actual JSON object that will define the index - * @return - The operation result of writing into Elasticsearch - * @throws DocumentStoreOperationException - */ - protected OperationResult createTable(String indexName, String settingsAndMappings) throws DocumentStoreOperationException { - OperationResult result = new OperationResult(); - result.setResultCode(500); - result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = getFullUrl("/" + indexName + "/", false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("PUT"); - conn.setRequestProperty("Content-Type", "application/json"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); - } + try { + attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(sb.toString())); + } catch (IOException e) { + logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); + throw new DocumentStoreOperationException(e.getMessage(), e); + } - try { - attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings)); - } catch(IOException e) { - logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); - throw new DocumentStoreOperationException(e.getMessage()); - } - handleResponse(conn, result); - - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.CREATE_INDEX_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()), - override, - indexName); - - return result; - } - - @Override - public DocumentOperationResult createDocument(String indexName, - DocumentStoreDataEntity document, - boolean allowImplicitIndexCreation) - throws DocumentStoreOperationException { - - if(!allowImplicitIndexCreation) { - - // Before we do anything, make sure that the specified index actually exists in the - // document store - we don't want to rely on ElasticSearch to fail the document - // create because it could be configured to implicitly create a non-existent index, - // which can lead to hard-to-debug behaviour with queries down the road. - OperationResult indexExistsResult = checkIndexExistence(indexName); - if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) { + logger.debug("\ncreateTable(), Sending 'PUT' request to URL : " + conn.getURL()); + logger.debug("Request content: " + sb.toString()); + + handleResponse(conn, opResult); + + shutdownConnection(conn); + + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.CREATE_INDEX_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), + override, indexName); - DocumentOperationResult opResult = new DocumentOperationResult(); - opResult.setResultCode(HttpStatus.NOT_FOUND.value()); - opResult.setResult("Document Index '" + indexName + "' does not exist."); - opResult.setFailureCause("Document Index '" + indexName + "' does not exist."); return opResult; - } } - if (document.getId() == null || document.getId().isEmpty()) { - return createDocumentWithoutId(indexName, document); - } else { - return createDocumentWithId(indexName, document); - } - } - - private DocumentOperationResult createDocumentWithId(String indexName, - DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - // check if the document already exists - DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); - - - if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) { - if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) { - opResult.setFailureCause("A document with the same id already exists."); - } else { - opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); - } - opResult.setResultCode(HttpStatus.CONFLICT.value()); - return opResult; - } + /** + * Will send the passed in JSON payload to Elasticsearch using the provided index name in an attempt to create the + * index. + * + * @param indexName - The name of the index to be created + * @param settingsAndMappings - The actual JSON object that will define the index + * @return - The operation result of writing into Elasticsearch + * @throws DocumentStoreOperationException + */ + protected OperationResult createTable(String indexName, String settingsAndMappings) + throws DocumentStoreOperationException { + OperationResult result = new OperationResult(); + result.setResultCode(500); + result.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); + + String fullUrl = getFullUrl("/" + indexName + "/", false); + HttpURLConnection conn = initializeConnection(fullUrl); - opResult = new DocumentOperationResult(); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + try { + conn.setRequestMethod("PUT"); + conn.setRequestProperty("Content-Type", "application/json"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + try { + attachContent(conn, ElasticSearchPayloadTranslator.translateESPayload(settingsAndMappings)); + } catch (IOException e) { + logger.error(SearchDbMsgs.INDEX_CREATE_FAILURE, e); + throw new DocumentStoreOperationException(e.getMessage()); + } + handleResponse(conn, result); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE - + "/" + document.getId(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.CREATE_INDEX_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, result.getResultCode()), + override, indexName); - try { - conn.setRequestMethod("PUT"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + return result; } - attachDocument(conn, document); + @Override + public DocumentOperationResult createDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + + if (!allowImplicitIndexCreation) { + + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) { + + DocumentOperationResult opResult = new DocumentOperationResult(); + opResult.setResultCode(HttpStatus.NOT_FOUND.value()); + opResult.setResult("Document Index '" + indexName + "' does not exist."); + opResult.setFailureCause("Document Index '" + indexName + "' does not exist."); + return opResult; + } + } - logger.debug("Sending 'PUT' request to: " + conn.getURL()); + if (document.getId() == null || document.getId().isEmpty()) { + return createDocumentWithoutId(indexName, document); + } else { + return createDocumentWithId(indexName, document); + } + } - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); + private DocumentOperationResult createDocumentWithId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + // check if the document already exists + DocumentOperationResult opResult = checkDocumentExistence(indexName, document.getId()); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName); - shutdownConnection(conn); + if (opResult.getResultCode() != HttpStatus.NOT_FOUND.value()) { + if (opResult.getResultCode() == HttpStatus.CONFLICT.value()) { + opResult.setFailureCause("A document with the same id already exists."); + } else { + opResult.setFailureCause("Failed to verify a document with the specified id does not already exist."); + } + opResult.setResultCode(HttpStatus.CONFLICT.value()); + return opResult; + } - return opResult; + opResult = new DocumentOperationResult(); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - private DocumentOperationResult createDocumentWithoutId(String indexName, - DocumentStoreDataEntity document) - throws DocumentStoreOperationException { + String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); + HttpURLConnection conn = initializeConnection(fullUrl); - DocumentOperationResult response = new DocumentOperationResult(); - // Initialize operation result with a failure codes / fault string - response.setResultCode(500); - response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + try { + conn.setRequestMethod("PUT"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + attachDocument(conn, document); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false); - HttpURLConnection conn = initializeConnection(fullUrl); + logger.debug("Sending 'PUT' request to: " + conn.getURL()); - try { - conn.setRequestMethod("POST"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); - } + handleResponse(conn, opResult); + buildDocumentResult(opResult, indexName); - attachDocument(conn, document); + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.CREATE_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName); + + shutdownConnection(conn); - logger.debug("Sending 'POST' request to: " + conn.getURL()); + return opResult; - handleResponse(conn, response); - buildDocumentResult(response, indexName); + } - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.CREATE_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()), - override, - indexName); + private DocumentOperationResult createDocumentWithoutId(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { - shutdownConnection(conn); + DocumentOperationResult response = new DocumentOperationResult(); + // Initialize operation result with a failure codes / fault string + response.setResultCode(500); + response.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - return response; - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) - throws DocumentStoreOperationException { -// conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - conn.setRequestProperty("Content-Type", "application/json"); - conn.setRequestProperty("Connection", "Close"); - attachContent(conn, doc.getContentInJson()); - } + String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE, false); + HttpURLConnection conn = initializeConnection(fullUrl); - private DocumentOperationResult checkDocumentExistence(String indexName, - String docId) - throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); + try { + conn.setRequestMethod("POST"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); + } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); + attachDocument(conn, document); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + logger.debug("Sending 'POST' request to: " + conn.getURL()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false); - HttpURLConnection conn = initializeConnection(fullUrl); + handleResponse(conn, response); + buildDocumentResult(response, indexName); - try { - conn.setRequestMethod("HEAD"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.CREATE_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, response.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, response.getResult()), + override, indexName); - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); + shutdownConnection(conn); - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); + return response; } - logger.debug("Response Code : " + resultCode); - - opResult.setResultCode(resultCode); + private void attachDocument(HttpURLConnection conn, DocumentStoreDataEntity doc) + throws DocumentStoreOperationException { + conn.setRequestProperty("Content-Type", "application/json"); + conn.setRequestProperty("Connection", "Close"); + attachContent(conn, doc.getContentInJson()); + } - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName, - docId); + private DocumentOperationResult checkDocumentExistence(String indexName, String docId) + throws DocumentStoreOperationException { + DocumentOperationResult opResult = new DocumentOperationResult(); - shutdownConnection(conn); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); - return opResult; - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - @Override - public DocumentOperationResult updateDocument(String indexName, - DocumentStoreDataEntity document, - boolean allowImplicitIndexCreation) - throws DocumentStoreOperationException { + String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + docId, false); + HttpURLConnection conn = initializeConnection(fullUrl); - if(!allowImplicitIndexCreation) { + try { + conn.setRequestMethod("HEAD"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); + } - // Before we do anything, make sure that the specified index actually exists in the - // document store - we don't want to rely on ElasticSearch to fail the document - // create because it could be configured to implicitly create a non-existent index, - // which can lead to hard-to-debug behaviour with queries down the road. - OperationResult indexExistsResult = checkIndexExistence(indexName); - if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) { + logger.debug("Sending 'HEAD' request to: " + conn.getURL()); - DocumentOperationResult opResult = new DocumentOperationResult(); - opResult.setResultCode(HttpStatus.NOT_FOUND.value()); - opResult.setResult("Document Index '" + indexName + "' does not exist."); - opResult.setFailureCause("Document Index '" + indexName + "' does not exist."); - return opResult; - } - } + int resultCode; + try { + resultCode = conn.getResponseCode(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); + } - DocumentOperationResult opResult = new DocumentOperationResult(); + logger.debug("Response Code : " + resultCode); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + opResult.setResultCode(resultCode); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.GET_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, docId); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() - + "?version=" + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + shutdownConnection(conn); - try { - conn.setRequestMethod("PUT"); - conn.setRequestProperty("Content-Type", "application/json"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + return opResult; } - attachDocument(conn, document); - - logger.debug("Sending 'PUT' request to: " + conn.getURL()); + @Override + public DocumentOperationResult updateDocument(String indexName, DocumentStoreDataEntity document, + boolean allowImplicitIndexCreation) throws DocumentStoreOperationException { + + if (!allowImplicitIndexCreation) { + // Before we do anything, make sure that the specified index actually exists in the + // document store - we don't want to rely on ElasticSearch to fail the document + // create because it could be configured to implicitly create a non-existent index, + // which can lead to hard-to-debug behaviour with queries down the road. + OperationResult indexExistsResult = checkIndexExistence(indexName); + if ((indexExistsResult.getResultCode() < 200) || (indexExistsResult.getResultCode() >= 300)) { + + DocumentOperationResult opResult = new DocumentOperationResult(); + opResult.setResultCode(HttpStatus.NOT_FOUND.value()); + opResult.setResult("Document Index '" + indexName + "' does not exist."); + opResult.setFailureCause("Document Index '" + indexName + "' does not exist."); + return opResult; + } + } - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); + DocumentOperationResult opResult = new DocumentOperationResult(); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName, - document.getId()); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - shutdownConnection(conn); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - return opResult; - } + String fullUrl = getFullUrl( + "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(), + false); + HttpURLConnection conn = initializeConnection(fullUrl); - @Override - public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); + try { + conn.setRequestMethod("PUT"); + conn.setRequestProperty("Content-Type", "application/json"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to PUT.", e); + } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + attachDocument(conn, document); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + logger.debug("Sending 'PUT' request to: " + conn.getURL()); - String fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() - + "?version=" + document.getVersion(), false); - HttpURLConnection conn = initializeConnection(fullUrl); + handleResponse(conn, opResult); + buildDocumentResult(opResult, indexName); - try { - conn.setRequestMethod("DELETE"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); - } + // Generate a metrics log so we can track how long the operation took. + metricsLogger.info(SearchDbMsgs.UPDATE_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, document.getId()); - logger.debug("\nSending 'DELETE' request to " + conn.getURL()); + shutdownConnection(conn); - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); - //supress the etag and url in response for delete as they are not required - if (opResult.getDocument() != null) { - opResult.getDocument().setEtag(null); - opResult.getDocument().setUrl(null); + return opResult; } - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, - indexName, - document.getId()); - - shutdownConnection(conn); - - return opResult; - } - - @Override - public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) - throws DocumentStoreOperationException { - DocumentOperationResult opResult = new DocumentOperationResult(); - - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = null; - if (document.getVersion() == null) { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); - } else { - fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() - + "?version=" + document.getVersion(), false); - } - HttpURLConnection conn = initializeConnection(fullUrl); + @Override + public DocumentOperationResult deleteDocument(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + DocumentOperationResult opResult = new DocumentOperationResult(); - logger.debug("\nSending 'GET' request to: " + conn.getURL()); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - handleResponse(conn, opResult); - buildDocumentResult(opResult, indexName); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName, - document.getId()); + String fullUrl = getFullUrl( + "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(), + false); + HttpURLConnection conn = initializeConnection(fullUrl); - shutdownConnection(conn); + try { + conn.setRequestMethod("DELETE"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to DELETE.", e); + } - return opResult; - } + logger.debug("\nSending 'DELETE' request to " + conn.getURL()); - public SearchOperationResult search(String indexName, String queryString) - throws DocumentStoreOperationException { - SearchOperationResult opResult = new SearchOperationResult(); + handleResponse(conn, opResult); + buildDocumentResult(opResult, indexName); + // supress the etag and url in response for delete as they are not required + if (opResult.getDocument() != null) { + opResult.getDocument().setEtag(null); + opResult.getDocument().setUrl(null); + } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + // Generate a metrics log so we can track how long the operation took. + metricsLogger.info(SearchDbMsgs.DELETE_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResult()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), + override, indexName, document.getId()); - String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false); + shutdownConnection(conn); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + return opResult; + } - HttpURLConnection conn = initializeConnection(fullUrl); + @Override + public DocumentOperationResult getDocument(String indexName, DocumentStoreDataEntity document) + throws DocumentStoreOperationException { + DocumentOperationResult opResult = new DocumentOperationResult(); - try { - conn.setRequestMethod("GET"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e); - } + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL()); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - handleResponse(conn, opResult); - buildSearchResult(opResult, indexName); + String fullUrl = null; + if (document.getVersion() == null) { + fullUrl = getFullUrl("/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId(), false); + } else { + fullUrl = getFullUrl( + "/" + indexName + "/" + DEFAULT_TYPE + "/" + document.getId() + "?version=" + document.getVersion(), + false); + } + HttpURLConnection conn = initializeConnection(fullUrl); + logger.debug("\nSending 'GET' request to: " + conn.getURL()); - metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName, - queryString); + handleResponse(conn, opResult); + buildDocumentResult(opResult, indexName); - return opResult; - } + // Generate a metrics log so we can track how long the operation took. + metricsLogger.info(SearchDbMsgs.GET_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, document.getId()); - public SearchOperationResult searchWithPayload(String indexName, String query) - throws DocumentStoreOperationException { - SearchOperationResult opResult = new SearchOperationResult(); + shutdownConnection(conn); - if (logger.isDebugEnabled()) { - logger.debug("Querying index: " + indexName + " with query string: " + query); + return opResult; } - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + @Override + public SearchOperationResult search(String indexName, String queryString) throws DocumentStoreOperationException { + SearchOperationResult opResult = new SearchOperationResult(); - String fullUrl = getFullUrl("/" + indexName + "/_search", false); + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + String fullUrl = getFullUrl("/" + indexName + "/_search" + "?" + queryString, false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("POST"); - conn.setRequestProperty("Content-Type", "application/json"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - attachContent(conn, query); + HttpURLConnection conn = initializeConnection(fullUrl); - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); + try { + conn.setRequestMethod("GET"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to GET.", e); + } - handleResponse(conn, opResult); - buildSearchResult(opResult, indexName); + logger.debug("\nsearch(), Sending 'GET' request to URL : " + conn.getURL()); - metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, - indexName, - query); + handleResponse(conn, opResult); + buildSearchResult(opResult, indexName); - shutdownConnection(conn); - return opResult; - } + metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, queryString); + return opResult; + } - public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) - throws DocumentStoreOperationException { + @Override + public SearchOperationResult searchWithPayload(String indexName, String query) + throws DocumentStoreOperationException { + SearchOperationResult opResult = new SearchOperationResult(); - SearchOperationResult opResult = new SearchOperationResult(); + if (logger.isDebugEnabled()) { + logger.debug("Querying index: " + indexName + " with query string: " + query); + } - if (logger.isDebugEnabled()) { - logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); - } + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - // Initialize operation result with a failure codes / fault string - opResult.setResultCode(500); - opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); + String fullUrl = getFullUrl("/" + indexName + "/_search", false); - String fullUrl = getFullUrl("/" + indexName + "/_suggest", false); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); + HttpURLConnection conn = initializeConnection(fullUrl); - HttpURLConnection conn = initializeConnection(fullUrl); + try { + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); + } - try { - conn.setRequestMethod("POST"); - conn.setRequestProperty("Content-Type", "application/json"); - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); - } + attachContent(conn, query); - attachContent(conn, query); + logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); + logger.debug("Request body = Elasticsearch query = " + query); - logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); - logger.debug("Request body = Elasticsearch query = " + query); + handleResponse(conn, opResult); + buildSearchResult(opResult, indexName); - handleResponse(conn, opResult); - buildSuggestResult(opResult, indexName); + metricsLogger + .info(SearchDbMsgs.QUERY_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, query); - metricsLogger.info(SearchDbMsgs.QUERY_DOCUMENT_TIME, - new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), - override, indexName, query); + shutdownConnection(conn); - shutdownConnection(conn); + return opResult; + } - return opResult; - } - private void attachContent(HttpURLConnection conn, String content) - throws DocumentStoreOperationException { - OutputStream outputStream = null; - OutputStreamWriter out = null; + @Override + public SearchOperationResult suggestionQueryWithPayload(String indexName, String query) + throws DocumentStoreOperationException { - try { - outputStream = conn.getOutputStream(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to get connection output stream.", e); - } + SearchOperationResult opResult = new SearchOperationResult(); - out = new OutputStreamWriter(outputStream); + if (logger.isDebugEnabled()) { + logger.debug("Querying Suggestion index: " + indexName + " with query string: " + query); + } - try { - out.write(content); - out.close(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to write to the output stream.", e); - } - } + // Initialize operation result with a failure codes / fault string + opResult.setResultCode(500); + opResult.setResult(INTERNAL_SERVER_ERROR_ELASTIC_SEARCH_OPERATION_FAULT); - private HttpURLConnection initializeConnection(String fullUrl) - throws DocumentStoreOperationException { - URL url = null; - HttpURLConnection conn = null; + String fullUrl = getFullUrl("/" + indexName + "/_suggest", false); - try { - url = new URL(fullUrl); - } catch (MalformedURLException e) { - throw new DocumentStoreOperationException("Error building a URL with " + url, e); - } + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - try { - conn = (HttpURLConnection) url.openConnection(); - conn.setDoOutput(true); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e); - } + HttpURLConnection conn = initializeConnection(fullUrl); - return conn; - } + try { + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to POST.", e); + } - private void handleResponse(HttpURLConnection conn, OperationResult opResult) - throws DocumentStoreOperationException { - int resultCode = 200; + attachContent(conn, query); - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); - } + logger.debug("\nsearch(), Sending 'POST' request to URL : " + conn.getURL()); + logger.debug("Request body = Elasticsearch query = " + query); - logger.debug("Response Code : " + resultCode); + handleResponse(conn, opResult); + buildSuggestResult(opResult, indexName); - InputStream inputStream = null; + metricsLogger + .info(SearchDbMsgs.QUERY_DOCUMENT_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResult()), + override, indexName, query); - if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success - inputStream = conn.getErrorStream(); - } else { - try { - inputStream = conn.getInputStream(); - } catch (IOException e) { shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to get the response input stream.", e); - } + + return opResult; } - InputStreamReader inputstreamreader = new InputStreamReader(inputStream); - BufferedReader bufferedreader = new BufferedReader(inputstreamreader); + private void attachContent(HttpURLConnection conn, String content) throws DocumentStoreOperationException { + OutputStream outputStream = null; + OutputStreamWriter out = null; - StringBuilder result = new StringBuilder(128); - String string = null; + try { + outputStream = conn.getOutputStream(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get connection output stream.", e); + } - try { - while ((string = bufferedreader.readLine()) != null) { - result.append(string).append("\n"); - } - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed getting the response body payload.", e); - } + out = new OutputStreamWriter(outputStream); - if (resultCode == HttpStatus.CONFLICT.value()) { - opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value()); - } else { - opResult.setResultCode(resultCode); - } - if (logger.isDebugEnabled()) { - logger.debug("Raw result string from ElasticSearch = " + result.toString()); - } - opResult.setResult(result.toString()); - opResult.setResultVersion(extractVersion(result.toString())); - } - - private String extractVersion(String result) throws DocumentStoreOperationException { - - JSONParser parser = new JSONParser(); - String version = null; - try { - JSONObject root = (JSONObject) parser.parse(result); - if (root.get("_version") != null) { - version = root.get("_version").toString(); - } - - } catch (ParseException e) { - - // Not all responses from ElasticSearch include a version, so - // if we don't get one back, just return an empty string rather - // than trigger a false failure. - version = ""; + try { + out.write(content); + out.close(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to write to the output stream.", e); + } } - return version; - } - /** - * This convenience method gets the current system time and stores - * it in an attribute in the supplied {@link MdcOverride} object so - * that it can be used later by the metrics logger. - * - * @param override - The {@link MdcOverride} object to update. - * @return - The supplied {@link MdcOverride} object. - */ - private MdcOverride getStartTime(MdcOverride override) { + private HttpURLConnection initializeConnection(String fullUrl) throws DocumentStoreOperationException { + URL url = null; + HttpURLConnection conn = null; - // Grab the current time... - long startTimeInMs = System.currentTimeMillis(); + try { + url = new URL(fullUrl); + } catch (MalformedURLException e) { + throw new DocumentStoreOperationException("Error building a URL with " + url, e); + } - // ...and add it as an attribute to the supplied MDC Override - // object. - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + try { + conn = (HttpURLConnection) url.openConnection(); + conn.setDoOutput(true); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to open connection to URL " + url, e); + } - // Return the MdcOverride object that we were passed. - // This looks odd, but it allows us to do stuff like: - // - // MdcOverride ov = getStartTime(new MdcOverride()) - // - // which is quite handy, but also allows us to pass in an existing - // MdcOverride object which already has some attributes set. - return override; - } + return conn; + } - private boolean isSuccess(OperationResult result) { + private void handleResponse(HttpURLConnection conn, OperationResult opResult) + throws DocumentStoreOperationException { + int resultCode = 200; - return isSuccessCode(result.getResultCode()); - } + try { + resultCode = conn.getResponseCode(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); + } + logger.debug("Response Code : " + resultCode); - private boolean isSuccessCode(int statusCode) { - return ((statusCode >= 200) && (statusCode < 300)); - } + InputStream inputStream = null; + if (!(resultCode >= 200 && resultCode <= 299)) { // 2xx response indicates success + inputStream = conn.getErrorStream(); + } else { + try { + inputStream = conn.getInputStream(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get the response input stream.", e); + } + } - @Override - public OperationResult performBulkOperations(BulkRequest[] requests) - throws DocumentStoreOperationException { + InputStreamReader inputstreamreader = new InputStreamReader(inputStream); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); - if (logger.isDebugEnabled()) { - String dbgString = "ESController: performBulkOperations - Operations: "; + StringBuilder result = new StringBuilder(128); + String string = null; - for (BulkRequest request : requests) { - dbgString += "[" + request.toString() + "] "; - } + try { + while ((string = bufferedreader.readLine()) != null) { + result.append(string).append("\n"); + } + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed getting the response body payload.", e); + } - logger.debug(dbgString); + if (resultCode == HttpStatus.CONFLICT.value()) { + opResult.setResultCode(HttpStatus.PRECONDITION_FAILED.value()); + } else { + opResult.setResultCode(resultCode); + } + if (logger.isDebugEnabled()) { + logger.debug("Raw result string from ElasticSearch = " + result.toString()); + } + opResult.setResult(result.toString()); + opResult.setResultVersion(extractVersion(result.toString())); } - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - // Parse the supplied set of operations. - // Iterate over the list of operations which we were provided and - // translate them into a format that ElasticSearh understands. - int opCount = 0; - StringBuilder esOperationSet = new StringBuilder(128); - List<ElasticSearchResultItem> rejected = new ArrayList<ElasticSearchResultItem>(); - for (BulkRequest request : requests) { - - // Convert the request to the syntax ElasticSearch likes. - if (buildEsOperation(request, esOperationSet, rejected)) { - opCount++; - } + private String extractVersion(String result) { + JSONParser parser = new JSONParser(); + String version = null; + try { + JSONObject root = (JSONObject) parser.parse(result); + if (root.get("_version") != null) { + version = root.get("_version").toString(); + } + } catch (ParseException e) { + // Not all responses from ElasticSearch include a version, so + // if we don't get one back, just return an empty string rather + // than trigger a false failure. + version = ""; + } + return version; } - ElasticSearchBulkOperationResult opResult = null; - if (opCount > 0) { + /** + * This convenience method gets the current system time and stores it in an attribute in the supplied + * {@link MdcOverride} object so that it can be used later by the metrics logger. + * + * @param override - The {@link MdcOverride} object to update. + * @return - The supplied {@link MdcOverride} object. + */ + private MdcOverride getStartTime(MdcOverride override) { + + // Grab the current time... + long startTimeInMs = System.currentTimeMillis(); + + // ...and add it as an attribute to the supplied MDC Override + // object. + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + + // Return the MdcOverride object that we were passed. + // This looks odd, but it allows us to do stuff like: + // + // MdcOverride ov = getStartTime(new MdcOverride()) + // + // which is quite handy, but also allows us to pass in an existing + // MdcOverride object which already has some attributes set. + return override; + } - // Open an HTTP connection to the ElasticSearch back end. - String fullUrl = getFullUrl("/_bulk", false); - URL url; - HttpURLConnection conn; - try { + private boolean isSuccess(OperationResult result) { - url = new URL(fullUrl); - conn = (HttpURLConnection) url.openConnection(); - conn.setRequestMethod("PUT"); - conn.setDoOutput(true); - conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); - conn.setRequestProperty("Connection", "Close"); + return isSuccessCode(result.getResultCode()); + } - } catch (IOException e) { - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); - } + private boolean isSuccessCode(int statusCode) { + return ((statusCode >= 200) && (statusCode < 300)); + } - throw new DocumentStoreOperationException("Failed to open connection to document store. Cause: " - + e.getMessage(), e); - } - StringBuilder bulkResult = new StringBuilder(128); - try { - // Create an output stream to write our request to. - OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); - ; + @Override + public OperationResult performBulkOperations(BulkRequest[] requests) throws DocumentStoreOperationException { if (logger.isDebugEnabled()) { - logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); - logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", - "\\n")); - } - - // Write the resulting request string to our output stream. (this sends the request to ES?) - out.write(esOperationSet.toString()); - out.close(); - - // Open an input stream on our connection in order to read back the results. - InputStream is = conn.getInputStream(); - InputStreamReader inputstreamreader = new InputStreamReader(is); - BufferedReader bufferedreader = new BufferedReader(inputstreamreader); + String dbgString = "ESController: performBulkOperations - Operations: "; - // Read the contents of the input stream into our result string... - String esResponseString = null; + for (BulkRequest request : requests) { + dbgString += "[" + request.toString() + "] "; + } - while ((esResponseString = bufferedreader.readLine()) != null) { - bulkResult.append(esResponseString).append("\n"); + logger.debug(dbgString); } - } catch (IOException e) { - - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - StringWriter sw = new StringWriter(); - e.printStackTrace(new PrintWriter(sw)); - logger.debug(sw.toString()); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); + + // Parse the supplied set of operations. + // Iterate over the list of operations which we were provided and + // translate them into a format that ElasticSearh understands. + int opCount = 0; + StringBuilder esOperationSet = new StringBuilder(128); + List<ElasticSearchResultItem> rejected = new ArrayList<>(); + for (BulkRequest request : requests) { + + // Convert the request to the syntax ElasticSearch likes. + if (buildEsOperation(request, esOperationSet, rejected)) { + opCount++; + } } - throw new DocumentStoreOperationException("Failure interacting with document store. Cause: " - + e.getMessage(), e); - } - - if (logger.isDebugEnabled()) { - logger.debug("ESController: Received result string from ElasticSearch: = " - + bulkResult.toString()); - } - - // ...and marshal the resulting string into a Java object. - try { - opResult = marshallEsBulkResult(bulkResult.toString()); - - } catch (IOException e) { + ElasticSearchBulkOperationResult opResult = null; + if (opCount > 0) { + + // Open an HTTP connection to the ElasticSearch back end. + String fullUrl = getFullUrl("/_bulk", false); + URL url; + HttpURLConnection conn; + try { + + url = new URL(fullUrl); + conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("PUT"); + conn.setDoOutput(true); + conn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded"); + conn.setRequestProperty("Connection", "Close"); + + } catch (IOException e) { + + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } + + throw new DocumentStoreOperationException( + "Failed to open connection to document store. Cause: " + e.getMessage(), e); + } + + StringBuilder bulkResult = new StringBuilder(128); + try { + // Create an output stream to write our request to. + OutputStreamWriter out = new OutputStreamWriter(conn.getOutputStream()); + + if (logger.isDebugEnabled()) { + logger.debug("ESController: Sending 'BULK' request to " + conn.getURL()); + logger.debug("ESController: operations: " + esOperationSet.toString().replaceAll("\n", "\\n")); + } + + // Write the resulting request string to our output stream. (this sends the request to ES?) + out.write(esOperationSet.toString()); + out.close(); + + // Open an input stream on our connection in order to read back the results. + InputStream is = conn.getInputStream(); + InputStreamReader inputstreamreader = new InputStreamReader(is); + BufferedReader bufferedreader = new BufferedReader(inputstreamreader); + + // Read the contents of the input stream into our result string... + String esResponseString = null; + + while ((esResponseString = bufferedreader.readLine()) != null) { + bulkResult.append(esResponseString).append("\n"); + } + + } catch (IOException e) { + + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + e.printStackTrace(new PrintWriter(sw)); + logger.debug(sw.toString()); + } + + throw new DocumentStoreOperationException( + "Failure interacting with document store. Cause: " + e.getMessage(), e); + } + + if (logger.isDebugEnabled()) { + logger.debug("ESController: Received result string from ElasticSearch: = " + bulkResult.toString()); + } + + // ...and marshal the resulting string into a Java object. + try { + opResult = marshallEsBulkResult(bulkResult.toString()); + + } catch (IOException e) { + + logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); + if (logger.isDebugEnabled()) { + logger.debug(Throwables.getStackTraceAsString(e)); + } + + throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " + e.getMessage(), + e); + } + } - logger.warn(SearchDbMsgs.BULK_OPERATION_FAILURE, e.getMessage()); - if (logger.isDebugEnabled()) { - logger.debug(Throwables.getStackTraceAsString(e)); + // Finally, build the operation result and return it to the caller. + OperationResult result = new OperationResult(); + result.setResultCode(207); + result.setResult(buildGenericBulkResultSet(opResult, rejected)); + + // In the success case we don't want the entire result string to be + // dumped into the metrics log, so concatenate it. + String resultStringForMetricsLog = result.getResult(); + if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) { + resultStringForMetricsLog = + resultStringForMetricsLog.substring(0, Math.max(resultStringForMetricsLog.length(), 85)) + "..."; } - throw new DocumentStoreOperationException("Failed to marshal response body. Cause: " - + e.getMessage(), e); - } - } + metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), + override); - // Finally, build the operation result and return it to the caller. - OperationResult result = new OperationResult(); - result.setResultCode(207); - result.setResult(buildGenericBulkResultSet(opResult, rejected)); - - // In the success case we don't want the entire result string to be - // dumped into the metrics log, so concatenate it. - String resultStringForMetricsLog = result.getResult(); - if ((result.getResultCode() >= 200) && (result.getResultCode() < 300)) { - resultStringForMetricsLog = resultStringForMetricsLog.substring(0, - Math.max(resultStringForMetricsLog.length(), 85)) + "..."; + return result; } - metricsLogger.info(SearchDbMsgs.BULK_OPERATIONS_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, result.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, resultStringForMetricsLog), - override); - - return result; - } - - - /** - * This method converts a {@link BulkRequest} object into a json structure - * which can be understood by ElasticSearch. - * - * @param request - The request to be performed. - * @param sb - The string builder to append the json data to - * @throws DocumentStoreOperationException - */ - private boolean buildEsOperation(BulkRequest request, StringBuilder sb, - List<ElasticSearchResultItem> fails) - throws DocumentStoreOperationException { - - boolean retVal = true; - OperationResult indexExistsResult = null; - - // What kind of operation are we performing? - switch (request.getOperationType()) { - - // Create a new document. - case CREATE: - - // Make sure that we were supplied a document payload. - if (request.getOperation().getDocument() == null) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Missing document payload", - request.getIndex(), - request.getId(), - 400, - request.getOperation().getMetaData().getUrl())); - return false; - } - - // Make sure that the supplied document URL is formatted - // correctly. - if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) { - fails.add(generateRejectionEntry(request.getOperationType(), - "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - "", - 400, - request.getOperation().getMetaData().getUrl())); - return false; - } - // Validate that the specified index actually exists before we - // try to perform the create. - if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Specified resource does not exist: " - + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - request.getId(), - 404, - request.getOperation().getMetaData().getUrl())); - return false; + /** + * This method converts a {@link BulkRequest} object into a json structure which can be understood by ElasticSearch. + * + * @param request - The request to be performed. + * @param sb - The string builder to append the json data to + * @throws DocumentStoreOperationException + */ + private boolean buildEsOperation(BulkRequest request, StringBuilder sb, List<ElasticSearchResultItem> fails) + throws DocumentStoreOperationException { + + boolean retVal = true; + // What kind of operation are we performing? + switch (request.getOperationType()) { + + // Create a new document. + case CREATE: + + // Make sure that we were supplied a document payload. + if (request.getOperation().getDocument() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), false)) { + fails.add(generateRejectionEntry(request.getOperationType(), + "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the create. + if (!indexExists(ApiUtils.extractIndexFromUri(request.getOperation().getMetaData().getUrl()))) { + + fails.add(generateRejectionEntry(request.getOperationType(), + "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // If we were supplied an id for the new document, then + // include it in the bulk operation to Elastic Search + if (request.getId() == null) { + + sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE)); + + // Otherwise, we just leave that parameter off and ElasticSearch + // will generate one for us. + } else { + sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, + request.getId())); + } + + try { + // Append the document that we want to create. + sb.append(request.getOperation().getDocument().toJson()).append("\n"); + } catch (JsonProcessingException e) { + throw new DocumentStoreOperationException("Failure parsing document to json", e); + } + + break; + + // Update an existing document. + case UPDATE: + + // Make sure that we were supplied a document payload. + if (request.getOperation().getDocument() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing document payload", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { + fails.add(generateRejectionEntry(request.getOperationType(), + "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the update. + if (!indexExists(request.getIndex())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the document we are trying to update actually + // exists before we try to perform the update. + if (!documentExists(request.getIndex(), request.getId())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // It is mandatory that a version be supplied for an update operation, + // so validate that now. + if (request.getOperation().getMetaData().getEtag() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Generate the update request... + sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(), + request.getOperation().getMetaData().getEtag())); + + // ...and append the document that we want to update. + try { + sb.append(request.getOperation().getDocument().toJson()).append("\n"); + } catch (JsonProcessingException e) { + throw new DocumentStoreOperationException("Failure parsing document to json", e); + } + break; + + // Delete an existing document. + case DELETE: + + // Make sure that the supplied document URL is formatted + // correctly. + if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { + fails.add(generateRejectionEntry(request.getOperationType(), + "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), "", 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the specified index actually exists before we + // try to perform the delete. + if (!indexExists(request.getIndex())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Validate that the document we are trying to update actually + // exists before we try to perform the delete. + if (!documentExists(request.getIndex(), request.getId())) { + + fails.add(generateRejectionEntry(request.getOperationType(), + "Specified resource does not exist: " + request.getOperation().getMetaData().getUrl(), + request.getIndex(), request.getId(), 404, request.getOperation().getMetaData().getUrl())); + return false; + } + + // It is mandatory that a version be supplied for a delete operation, + // so validate that now. + if (request.getOperation().getMetaData().getEtag() == null) { + + fails.add(generateRejectionEntry(request.getOperationType(), "Missing mandatory ETag field", + request.getIndex(), request.getId(), 400, request.getOperation().getMetaData().getUrl())); + return false; + } + + // Generate the delete request. + sb.append(String.format(BULK_DELETE_TEMPLATE, request.getIndex(), DEFAULT_TYPE, request.getId(), + request.getOperation().getMetaData().getEtag())); + break; + default: } - // If we were supplied an id for the new document, then - // include it in the bulk operation to Elastic Search - if (request.getId() == null) { + return retVal; + } - sb.append(String.format(BULK_CREATE_WITHOUT_INDEX_TEMPLATE, - request.getIndex(), - DEFAULT_TYPE)); + private boolean indexExists(String index) throws DocumentStoreOperationException { - // Otherwise, we just leave that parameter off and ElasticSearch - // will generate one for us. - } else { - sb.append(String.format(BULK_CREATE_WITH_INDEX_TEMPLATE, - request.getIndex(), - DEFAULT_TYPE, - request.getId())); - } + OperationResult indexExistsResult = checkIndexExistence(index); - try { - // Append the document that we want to create. - sb.append(request.getOperation().getDocument().toJson()).append("\n"); - } catch (JsonProcessingException e) { - throw new DocumentStoreOperationException("Failure parsing document to json", e); - } - - break; + return ((indexExistsResult.getResultCode() >= 200) && (indexExistsResult.getResultCode() < 300)); + } - // Update an existing document. - case UPDATE: + private boolean documentExists(String index, String id) throws DocumentStoreOperationException { - // Make sure that we were supplied a document payload. - if (request.getOperation().getDocument() == null) { + OperationResult docExistsResult = checkDocumentExistence(index, id); - fails.add(generateRejectionEntry(request.getOperationType(), - "Missing document payload", - request.getIndex(), - request.getId(), - 400, - request.getOperation().getMetaData().getUrl())); - return false; - } + return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300)); + } - // Make sure that the supplied document URL is formatted - // correctly. - if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { - fails.add(generateRejectionEntry(request.getOperationType(), - "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - "", - 400, - request.getOperation().getMetaData().getUrl())); - return false; + /** + * This method constructs a status entry for a bulk operation which has been rejected before even sending it to the + * document store. + * + * @param rejectReason - A message describing why the operation was rejected. + * @param anId - The identifier associated with the document being acted on. + * @param statusCode - An HTTP status code. + * @return - A result set item. + */ + private ElasticSearchResultItem generateRejectionEntry(OperationType opType, String rejectReason, String index, + String anId, int statusCode, String originalUrl) { + + ElasticSearchError err = new ElasticSearchError(); + err.setReason(rejectReason); + + ElasticSearchOperationStatus op = new ElasticSearchOperationStatus(); + op.setIndex(index); + op.setId(anId); + op.setStatus(statusCode); + op.setError(err); + op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl); + + ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem(); + + switch (opType) { + case CREATE: + rejectionResult.setCreate(op); + break; + case UPDATE: + rejectionResult.setIndex(op); + break; + case DELETE: + rejectionResult.setDelete(op); + break; + default: } - // Validate that the specified index actually exists before we - // try to perform the update. - if (!indexExists(request.getIndex())) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Specified resource does not exist: " - + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - request.getId(), - 404, - request.getOperation().getMetaData().getUrl())); - return false; - } + return rejectionResult; + } - // Validate that the document we are trying to update actually - // exists before we try to perform the update. - if (!documentExists(request.getIndex(), request.getId())) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Specified resource does not exist: " - + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - request.getId(), - 404, - request.getOperation().getMetaData().getUrl())); - return false; + /** + * This method takes the json structure returned from ElasticSearch in response to a bulk operations request and + * marshals it into a Java object. + * + * @param jsonResult - The bulk operations response returned from ElasticSearch. + * @return - The marshalled response. + * @throws JsonParseException + * @throws JsonMappingException + * @throws IOException + */ + private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) throws IOException { + if (jsonResult != null) { + if (logger.isDebugEnabled()) { + logger.debug("ESController: Marshalling ES result set from json: " + jsonResult.replaceAll("\n", "")); + } + + ObjectMapper mapper = new ObjectMapper(); + mapper.setSerializationInclusion(Include.NON_EMPTY); + + return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class); } - // It is mandatory that a version be supplied for an update operation, - // so validate that now. - if (request.getOperation().getMetaData().getEtag() == null) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Missing mandatory ETag field", - request.getIndex(), - request.getId(), - 400, - request.getOperation().getMetaData().getUrl())); - return false; - } + return null; + } - // Generate the update request... - sb.append(String.format(BULK_IMPORT_INDEX_TEMPLATE, - request.getIndex(), - DEFAULT_TYPE, - request.getId(), - request.getOperation().getMetaData().getEtag())); + /** + * This method takes the marshalled ElasticSearch bulk response and converts it into a generic response payload. + * + * @param esResult - ElasticSearch bulk operations response. + * @return - A generic result set. + */ + private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult, + List<ElasticSearchResultItem> rejectedOps) { + int totalOps = 0; + int totalSuccess = 0; + int totalFails = 0; - // ...and append the document that we want to update. - try { - sb.append(request.getOperation().getDocument().toJson()).append("\n"); - } catch (JsonProcessingException e) { - throw new DocumentStoreOperationException("Failure parsing document to json", e); - } - break; - - // Delete an existing document. - case DELETE: - - // Make sure that the supplied document URL is formatted - // correctly. - if (!ApiUtils.validateDocumentUri(request.getOperation().getMetaData().getUrl(), true)) { - fails.add(generateRejectionEntry(request.getOperationType(), - "Invalid document URL: " + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - "", - 400, - request.getOperation().getMetaData().getUrl())); - return false; - } + if (logger.isDebugEnabled()) { - // Validate that the specified index actually exists before we - // try to perform the delete. - if (!indexExists(request.getIndex())) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Specified resource does not exist: " - + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - request.getId(), - 404, - request.getOperation().getMetaData().getUrl())); - return false; + logger.debug("ESController: Build generic result set. ES Results: " + + ((esResult != null) ? esResult.toString() : "[]") + " Rejected Ops: " + rejectedOps.toString()); } - // Validate that the document we are trying to update actually - // exists before we try to perform the delete. - if (!documentExists(request.getIndex(), request.getId())) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Specified resource does not exist: " - + request.getOperation().getMetaData().getUrl(), - request.getIndex(), - request.getId(), - 404, - request.getOperation().getMetaData().getUrl())); - return false; + // Build a combined list of result items from the results returned + // from ElasticSearch and the list of operations that we rejected + // without sending to ElasticSearch. + List<ElasticSearchResultItem> combinedResults = new ArrayList<>(); + if (esResult != null) { + combinedResults.addAll(Arrays.asList(esResult.getItems())); } - - // It is mandatory that a version be supplied for a delete operation, - // so validate that now. - if (request.getOperation().getMetaData().getEtag() == null) { - - fails.add(generateRejectionEntry(request.getOperationType(), - "Missing mandatory ETag field", - request.getIndex(), - request.getId(), - 400, - request.getOperation().getMetaData().getUrl())); - return false; + combinedResults.addAll(rejectedOps); + + // Iterate over the individual results in the resulting result set. + StringBuilder resultsBuilder = new StringBuilder(); + AtomicBoolean firstItem = new AtomicBoolean(true); + for (ElasticSearchResultItem item : combinedResults) { + + // Increment the operation counts. + totalOps++; + if (isSuccessCode(item.operationStatus().getStatus())) { + totalSuccess++; + } else { + totalFails++; + } + + // Prepend a comma to our response string unless this it the + // first result in the set. + if (!firstItem.compareAndSet(true, false)) { + resultsBuilder.append(", "); + } + + // Append the current result as a generic json structure. + resultsBuilder.append(item.toJson()); } - // Generate the delete request. - sb.append(String.format(BULK_DELETE_TEMPLATE, - request.getIndex(), - DEFAULT_TYPE, - request.getId(), - request.getOperation().getMetaData().getEtag())); - break; - default: + return "{ \"total_operations\": " + totalOps + ", " + "\"total_success\": " + totalSuccess + ", " + + "\"total_fails\": " + totalFails + ", " + "\"results\": [" + resultsBuilder.toString() + "]}"; } - return retVal; - } - - private boolean indexExists(String index) throws DocumentStoreOperationException { - - OperationResult indexExistsResult = checkIndexExistence(index); - - return ((indexExistsResult.getResultCode() >= 200) - && (indexExistsResult.getResultCode() < 300)); - } - - private boolean documentExists(String index, String id) throws DocumentStoreOperationException { - - OperationResult docExistsResult = checkDocumentExistence(index, id); - - return ((docExistsResult.getResultCode() >= 200) && (docExistsResult.getResultCode() < 300)); - } - - /** - * This method constructs a status entry for a bulk operation which has - * been rejected before even sending it to the document store. - * - * @param rejectReason - A message describing why the operation was rejected. - * @param anId - The identifier associated with the document being - * acted on. - * @param statusCode - An HTTP status code. - * @return - A result set item. - */ - private ElasticSearchResultItem generateRejectionEntry(OperationType opType, - String rejectReason, - String index, - String anId, - int statusCode, - String originalUrl) { - - ElasticSearchError err = new ElasticSearchError(); - err.setReason(rejectReason); - - ElasticSearchOperationStatus op = new ElasticSearchOperationStatus(); - op.setIndex(index); - op.setId(anId); - op.setStatus(statusCode); - op.setError(err); - op.setAdditionalProperties(ElasticSearchResultItem.REQUEST_URL, originalUrl); - - ElasticSearchResultItem rejectionResult = new ElasticSearchResultItem(); - - switch (opType) { - case CREATE: - rejectionResult.setCreate(op); - break; - case UPDATE: - rejectionResult.setIndex(op); - break; - case DELETE: - rejectionResult.setDelete(op); - break; - default: - } - return rejectionResult; - } - - - /** - * This method takes the json structure returned from ElasticSearch in - * response to a bulk operations request and marshals it into a Java - * object. - * - * @param jsonResult - The bulk operations response returned from - * ElasticSearch. - * @return - The marshalled response. - * @throws JsonParseException - * @throws JsonMappingException - * @throws IOException - */ - private ElasticSearchBulkOperationResult marshallEsBulkResult(String jsonResult) - throws JsonParseException, JsonMappingException, IOException { - - if (jsonResult != null) { - if (logger.isDebugEnabled()) { - logger.debug("ESController: Marshalling ES result set from json: " - + jsonResult.replaceAll("\n", "")); - } - - ObjectMapper mapper = new ObjectMapper(); - mapper.setSerializationInclusion(Include.NON_EMPTY); - - return mapper.readValue(jsonResult, ElasticSearchBulkOperationResult.class); - } + /** + * This method queryies ElasticSearch to determine if the supplied index is present in the document store. + * + * @param indexName - The index to look for. + * @return - An operation result indicating the success or failure of the check. + * @throws DocumentStoreOperationException + */ + public OperationResult checkIndexExistence(String indexName) throws DocumentStoreOperationException { - return null; - } + // Initialize operation result with a failure codes / fault string + OperationResult opResult = new OperationResult(); + opResult.setResultCode(500); + // Grab the current time so we can use it to generate a metrics log. + MdcOverride override = getStartTime(new MdcOverride()); - /** - * This method takes the marshalled ElasticSearch bulk response and - * converts it into a generic response payload. - * - * @param esResult - ElasticSearch bulk operations response. - * @return - A generic result set. - */ - private String buildGenericBulkResultSet(ElasticSearchBulkOperationResult esResult, - List<ElasticSearchResultItem> rejectedOps) { + String fullUrl = getFullUrl("/" + indexName, false); + HttpURLConnection conn = initializeConnection(fullUrl); - int totalOps = 0; - int totalSuccess = 0; - int totalFails = 0; + try { + conn.setRequestMethod("HEAD"); - if (logger.isDebugEnabled()) { + } catch (ProtocolException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); + } - logger.debug("ESController: Build generic result set. ES Results: " - + ((esResult != null) ? esResult.toString() : "[]") - + " Rejected Ops: " + rejectedOps.toString()); - } + logger.debug("Sending 'HEAD' request to: " + conn.getURL()); - // Build a combined list of result items from the results returned - // from ElasticSearch and the list of operations that we rejected - // without sending to ElasticSearch. - List<ElasticSearchResultItem> combinedResults = new ArrayList<ElasticSearchResultItem>(); - if (esResult != null) { - combinedResults.addAll(Arrays.asList(esResult.getItems())); - } - combinedResults.addAll(rejectedOps); - - // Iterate over the individual results in the resulting result set. - StringBuilder resultsBuilder = new StringBuilder(); - AtomicBoolean firstItem = new AtomicBoolean(true); - for (ElasticSearchResultItem item : combinedResults) { - - // Increment the operation counts. - totalOps++; - if (isSuccessCode(item.operationStatus().getStatus())) { - totalSuccess++; - } else { - totalFails++; - } - - // Prepend a comma to our response string unless this it the - // first result in the set. - if (!firstItem.compareAndSet(true, false)) { - resultsBuilder.append(", "); - } - - // Append the current result as a generic json structure. - resultsBuilder.append(item.toJson()); - } + int resultCode; + try { + resultCode = conn.getResponseCode(); + } catch (IOException e) { + shutdownConnection(conn); + throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); + } + logger.debug("Response Code : " + resultCode); - // Now, build the result string and return it. - String responseBody = "{ \"total_operations\": " + totalOps + ", " - + "\"total_success\": " + totalSuccess + ", " - + "\"total_fails\": " + totalFails + ", " - + "\"results\": [" - + resultsBuilder.toString() - + "]}"; - - return responseBody; - } - - - /** - * This method queryies ElasticSearch to determine if the supplied - * index is present in the document store. - * - * @param indexName - The index to look for. - * @return - An operation result indicating the success or failure of - * the check. - * @throws DocumentStoreOperationException - */ - public OperationResult checkIndexExistence(String indexName) - throws DocumentStoreOperationException { - - // Initialize operation result with a failure codes / fault string - OperationResult opResult = new OperationResult(); - opResult.setResultCode(500); - - // Grab the current time so we can use it to generate a metrics log. - MdcOverride override = getStartTime(new MdcOverride()); - - String fullUrl = getFullUrl("/" + indexName, false); - HttpURLConnection conn = initializeConnection(fullUrl); - - try { - conn.setRequestMethod("HEAD"); - - } catch (ProtocolException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to set HTTP request method to HEAD.", e); - } + opResult.setResultCode(resultCode); - logger.debug("Sending 'HEAD' request to: " + conn.getURL()); + // Generate a metrics log so we can track how long the operation took. + metricsLogger + .info(SearchDbMsgs.CHECK_INDEX_TIME, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), + override, indexName); - int resultCode; - try { - resultCode = conn.getResponseCode(); - } catch (IOException e) { - shutdownConnection(conn); - throw new DocumentStoreOperationException("Failed to get the response code from the connection.", e); - } - logger.debug("Response Code : " + resultCode); + shutdownConnection(conn); - opResult.setResultCode(resultCode); + return opResult; + } - // Generate a metrics log so we can track how long the operation took. - metricsLogger.info(SearchDbMsgs.CHECK_INDEX_TIME, - new LogFields() - .setField(LogLine.DefinedFields.RESPONSE_CODE, opResult.getResultCode()) - .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, opResult.getResultCode()), - override, - indexName); - shutdownConnection(conn); + private void buildDocumentResult(DocumentOperationResult result, String index) + throws DocumentStoreOperationException { - return opResult; - } + JSONParser parser = new JSONParser(); + JSONObject root; + try { + root = (JSONObject) parser.parse(result.getResult()); + if (result.getResultCode() >= 200 && result.getResultCode() <= 299) { + // Success response object + Document doc = new Document(); + doc.setEtag(result.getResultVersion()); + doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString())); - private void buildDocumentResult(DocumentOperationResult result, String index) - throws DocumentStoreOperationException { + doc.setContent((JSONObject) root.get("_source")); + result.setDocument(doc); - JSONParser parser = new JSONParser(); - JSONObject root; - try { - root = (JSONObject) parser.parse(result.getResult()); + } else { + // Error response object + JSONObject error = (JSONObject) root.get("error"); + if (error != null) { + result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString())); + } - if (result.getResultCode() >= 200 && result.getResultCode() <= 299) { - // Success response object - Document doc = new Document(); - doc.setEtag(result.getResultVersion()); - doc.setUrl(buildDocumentResponseUrl(index, root.get("_id").toString())); + } + } catch (Exception e) { + throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult()); + } - doc.setContent((JSONObject) root.get("_source")); - result.setDocument(doc); - } else { - // Error response object - JSONObject error = (JSONObject) root.get("error"); - if (error != null) { - result.setError(new ErrorResult(error.get("type").toString(), - error.get("reason").toString())); - } + } - } - } catch (Exception e) { - throw new DocumentStoreOperationException("Failed to parse Elastic Search response." - + result.getResult()); + private String buildDocumentResponseUrl(String index, String id) { + return ApiUtils.buildDocumentUri(index, id); } + private void buildSearchResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { - } - - private String buildDocumentResponseUrl(String index, String id) { - return ApiUtils.buildDocumentUri(index, id); - } - - private void buildSearchResult(SearchOperationResult result, String index) - throws DocumentStoreOperationException { - - JSONParser parser = new JSONParser(); - JSONObject root; - - try { - root = (JSONObject) parser.parse(result.getResult()); - if (result.getResultCode() >= 200 && result.getResultCode() <= 299) { - JSONObject hits = (JSONObject) root.get("hits"); - JSONArray hitArray = (JSONArray) hits.get("hits"); - SearchHits searchHits = new SearchHits(); - searchHits.setTotalHits(hits.get("total").toString()); - ArrayList<SearchHit> searchHitArray = new ArrayList<SearchHit>(); - - for (int i = 0; i < hitArray.size(); i++) { - JSONObject hit = (JSONObject) hitArray.get(i); - SearchHit searchHit = new SearchHit(); - searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : ""); - Document doc = new Document(); - if (hit.get("_version") != null) { - doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : ""); - } - - doc.setUrl(buildDocumentResponseUrl(index, (hit.get("_id") != null) - ? hit.get("_id").toString() : "")); - doc.setContent((JSONObject) hit.get("_source")); - searchHit.setDocument(doc); - searchHitArray.add(searchHit); - } - searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()])); - result.setSearchResult(searchHits); - - JSONObject aggregations = (JSONObject) root.get("aggregations"); - if (aggregations != null) { - AggregationResult[] aggResults = - AggregationParsingUtil.parseAggregationResults(aggregations); - AggregationResults aggs = new AggregationResults(); - aggs.setAggregations(aggResults); - result.setAggregationResult(aggs); - } + JSONParser parser = new JSONParser(); + JSONObject root; - // success - } else { - JSONObject error = (JSONObject) root.get("error"); - if (error != null) { - result.setError(new ErrorResult(error.get("type").toString(), - error.get("reason").toString())); + try { + root = (JSONObject) parser.parse(result.getResult()); + if (result.getResultCode() >= 200 && result.getResultCode() <= 299) { + JSONObject hits = (JSONObject) root.get("hits"); + JSONArray hitArray = (JSONArray) hits.get("hits"); + SearchHits searchHits = new SearchHits(); + searchHits.setTotalHits(hits.get("total").toString()); + ArrayList<SearchHit> searchHitArray = new ArrayList<>(); + + for (int i = 0; i < hitArray.size(); i++) { + JSONObject hit = (JSONObject) hitArray.get(i); + SearchHit searchHit = new SearchHit(); + searchHit.setScore((hit.get("_score") != null) ? hit.get("_score").toString() : ""); + Document doc = new Document(); + if (hit.get("_version") != null) { + doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : ""); + } + + doc.setUrl( + buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : "")); + doc.setContent((JSONObject) hit.get("_source")); + searchHit.setDocument(doc); + searchHitArray.add(searchHit); + } + searchHits.setHits(searchHitArray.toArray(new SearchHit[searchHitArray.size()])); + result.setSearchResult(searchHits); + + JSONObject aggregations = (JSONObject) root.get("aggregations"); + if (aggregations != null) { + AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations); + AggregationResults aggs = new AggregationResults(); + aggs.setAggregations(aggResults); + result.setAggregationResult(aggs); + } + + // success + } else { + JSONObject error = (JSONObject) root.get("error"); + if (error != null) { + result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString())); + } + } + } catch (Exception e) { + throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult()); } - } - } catch (Exception e) { - throw new DocumentStoreOperationException("Failed to parse Elastic Search response." - + result.getResult()); - } - } - - private void buildSuggestResult(SearchOperationResult result, String index) - throws DocumentStoreOperationException { - - JSONParser parser = new JSONParser (); - JSONObject root; - try { - root = (JSONObject) parser.parse ( result.getResult () ); - if (result.getResultCode () >= 200 && result.getResultCode () <= 299) { - JSONArray hitArray = (JSONArray) root.get ( "suggest-vnf" ); - JSONObject hitdata = (JSONObject) hitArray.get ( 0 ); - JSONArray optionsArray = (JSONArray) hitdata.get ( "options" ); - SuggestHits suggestHits = new SuggestHits (); - suggestHits.setTotalHits ( String.valueOf ( optionsArray.size () ) ); - - ArrayList<SuggestHit> suggestHitArray = new ArrayList<SuggestHit> (); - - for (int i = 0; i < optionsArray.size (); i++) { - JSONObject hit = (JSONObject) optionsArray.get ( i ); - - SuggestHit suggestHit = new SuggestHit (); - suggestHit.setScore ( (hit.get ( "score" ) != null) ? hit.get ( "score" ).toString () : "" ); - suggestHit.setText ( (hit.get ( "text" ) != null) ? hit.get ( "text" ).toString () : "" ); - Document doc = new Document (); - if (hit.get ( "_version" ) != null) { - doc.setEtag ( (hit.get ( "_version" ) != null) ? hit.get ( "_version" ).toString () : "" ); - } - doc.setUrl ( buildDocumentResponseUrl ( index, - (hit.get ( "_id" ) != null) ? hit.get ( "_id" ).toString () : "" ) ); - - doc.setContent ( (JSONObject) hit.get ( "payload" ) ); - suggestHit.setDocument ( doc ); - suggestHitArray.add ( suggestHit ); - } - suggestHits.setHits ( suggestHitArray.toArray ( new SuggestHit[suggestHitArray.size ()] ) ); - result.setSuggestResult ( suggestHits ); - - JSONObject aggregations = (JSONObject) root.get ( "aggregations" ); - if (aggregations != null) { - AggregationResult[] aggResults = - AggregationParsingUtil.parseAggregationResults ( aggregations ); - AggregationResults aggs = new AggregationResults (); - aggs.setAggregations ( aggResults ); - result.setAggregationResult ( aggs ); - } + } - // success - } else { - JSONObject error = (JSONObject) root.get ( "error" ); - if (error != null) { - result.setError ( - new ErrorResult ( error.get ( "type" ).toString (), error.get ( "reason" ).toString () ) ); + private void buildSuggestResult(SearchOperationResult result, String index) throws DocumentStoreOperationException { + JSONParser parser = new JSONParser(); + JSONObject root; + try { + root = (JSONObject) parser.parse(result.getResult()); + if (result.getResultCode() >= 200 && result.getResultCode() <= 299) { + JSONArray hitArray = (JSONArray) root.get("suggest-vnf"); + JSONObject hitdata = (JSONObject) hitArray.get(0); + JSONArray optionsArray = (JSONArray) hitdata.get("options"); + SuggestHits suggestHits = new SuggestHits(); + suggestHits.setTotalHits(String.valueOf(optionsArray.size())); + + ArrayList<SuggestHit> suggestHitArray = new ArrayList<>(); + + for (int i = 0; i < optionsArray.size(); i++) { + JSONObject hit = (JSONObject) optionsArray.get(i); + + SuggestHit suggestHit = new SuggestHit(); + suggestHit.setScore((hit.get("score") != null) ? hit.get("score").toString() : ""); + suggestHit.setText((hit.get("text") != null) ? hit.get("text").toString() : ""); + Document doc = new Document(); + if (hit.get("_version") != null) { + doc.setEtag((hit.get("_version") != null) ? hit.get("_version").toString() : ""); + } + doc.setUrl( + buildDocumentResponseUrl(index, (hit.get("_id") != null) ? hit.get("_id").toString() : "")); + + doc.setContent((JSONObject) hit.get("payload")); + suggestHit.setDocument(doc); + suggestHitArray.add(suggestHit); + } + suggestHits.setHits(suggestHitArray.toArray(new SuggestHit[suggestHitArray.size()])); + result.setSuggestResult(suggestHits); + + JSONObject aggregations = (JSONObject) root.get("aggregations"); + if (aggregations != null) { + AggregationResult[] aggResults = AggregationParsingUtil.parseAggregationResults(aggregations); + AggregationResults aggs = new AggregationResults(); + aggs.setAggregations(aggResults); + result.setAggregationResult(aggs); + } + + // success + } else { + JSONObject error = (JSONObject) root.get("error"); + if (error != null) { + result.setError(new ErrorResult(error.get("type").toString(), error.get("reason").toString())); + } + } + } catch (Exception e) { + throw new DocumentStoreOperationException("Failed to parse Elastic Search response." + result.getResult()); } - } - } catch (Exception e) { - throw new DocumentStoreOperationException ( - "Failed to parse Elastic Search response." + result.getResult () ); } - } - - - } +} |