From 4609a8c564afa12bfd3781567eee84a88cba98c0 Mon Sep 17 00:00:00 2001 From: gfraboni Date: Thu, 20 Jul 2017 14:17:12 -0400 Subject: Handle missing search indexes [AAI-63] Data Router must handle Search Service document create failures if index does not exit. Change-Id: Ic4412a6295ec9f84b223c80c0326c5ef2face99d Signed-off-by: gfraboni --- .../datarouter/logging/DataRouterMsgs.java | 16 +- .../datarouter/policy/EntityEventPolicy.java | 168 ++++------ .../datarouter/util/SearchServiceAgent.java | 368 +++++++++++++++++++++ 3 files changed, 442 insertions(+), 110 deletions(-) create mode 100644 src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java (limited to 'src/main/java/org/openecomp/datarouter') diff --git a/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java b/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java index 8304c96..71a5d5d 100644 --- a/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java +++ b/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java @@ -126,6 +126,11 @@ public enum DataRouterMsgs implements LogMessageEnum { */ PROCESS_REST_REQUEST, + /** + * Index {0} may not exist in the search data store. Attempting to create it now. + */ + CREATE_MISSING_INDEX, + /** * Processed event {0}. Result: {1} * Arguments: {0} = event topic {1} = result @@ -150,7 +155,16 @@ public enum DataRouterMsgs implements LogMessageEnum { INVALID_OXM_FILE, - INVALID_OXM_DIR; + INVALID_OXM_DIR, + + /** + * Failed to create or update document in index {0}. Cause: {1} + * + * Arguments: + * {0} = Index name + * {1} = Failure cause + */ + FAIL_TO_CREATE_UPDATE_DOC; /** * Static initializer to ensure the resource bundles for this class are loaded... diff --git a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java index afddad2..3c3990e 100644 --- a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java +++ b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java @@ -73,6 +73,7 @@ import org.openecomp.datarouter.util.EntityOxmReferenceHelper; import org.openecomp.datarouter.util.ExternalOxmModelProcessor; import org.openecomp.datarouter.util.OxmModelLoader; import org.openecomp.datarouter.util.RouterServiceUtil; +import org.openecomp.datarouter.util.SearchServiceAgent; import org.openecomp.datarouter.util.SearchSuggestionPermutation; import org.openecomp.datarouter.util.Version; import org.openecomp.datarouter.util.VersionedOxmEntities; @@ -89,7 +90,6 @@ public class EntityEventPolicy implements Processor { private static final String entitySearchSchema = "entitysearch_schema.json"; private static final String topographicalSearchSchema = "topographysearch_schema.json"; private Collection externalOxmModelProcessors; - RestClient searchClient = null; private final String EVENT_HEADER = "event-header"; private final String ENTITY_HEADER = "entity"; @@ -106,9 +106,19 @@ public class EntityEventPolicy implements Processor { Map oxmVersionContextMap = new HashMap<>(); private String oxmVersion = null; - private String entityIndexTarget = null; + /** Agent for communicating with the Search Service. */ + private SearchServiceAgent searchAgent = null; + + /** Search index name for storing AAI event entities. */ + private String entitySearchIndex; + + /** Search index name for storing topographical search data. */ + private String topographicalSearchIndex; + + /** Search index name for suggestive search data. */ + private String aggregateGenericVnfIndex; + private String entitySearchTarget = null; - private String topographicalIndexTarget = null; private String topographicalSearchTarget = null; private String autoSuggestSearchTarget = null; private String aggregationSearchVnfTarget = null; @@ -131,27 +141,27 @@ public class EntityEventPolicy implements Processor { srcDomain = config.getSourceDomain(); - entityIndexTarget = - EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(), - config.getSearchEntitySearchIndex()); + // Populate the index names. + entitySearchIndex = config.getSearchEntitySearchIndex(); + topographicalSearchIndex = config.getSearchTopographySearchIndex(); + aggregateGenericVnfIndex = config.getSearchAggregationVnfIndex(); + + // Instantiate the agent that we will use for interacting with the Search Service. + searchAgent = new SearchServiceAgent(config.getSearchCertName(), + config.getSearchKeystore(), + config.getSearchKeystorePwd(), + EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), + config.getSearchEndpoint()), + config.getSearchEndpointDocuments(), + logger); entitySearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(), config.getSearchEntitySearchIndex(), config.getSearchEndpointDocuments()); - topographicalIndexTarget = - EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(), - config.getSearchTopographySearchIndex()); - topographicalSearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(), config.getSearchTopographySearchIndex()); - // Create REST client for search service - searchClient = new RestClient().validateServerHostname(false).validateServerCertChain(true) - .clientCertFile(DataRouterConstants.DR_HOME_AUTH + config.getSearchCertName()) - .clientCertPassword(Password.deobfuscate(config.getSearchKeystorePwd())) - .trustStore(DataRouterConstants.DR_HOME_AUTH + config.getSearchKeystore()); - autoSuggestSearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(), config.getSearchEntityAutoSuggestIndex(), config.getSearchEndpointDocuments()); @@ -193,73 +203,12 @@ public class EntityEventPolicy implements Processor { public void startup() { // Create the indexes in the search service if they do not already exist. - createSearchIndex(entityIndexTarget, entitySearchSchema); - createSearchIndex(topographicalIndexTarget, topographicalSearchSchema); + searchAgent.createSearchIndex(entitySearchIndex, entitySearchSchema); + searchAgent.createSearchIndex(topographicalSearchIndex, topographicalSearchSchema); logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED); } - /** - * Creates an index through the search db abstraction - * - * @param searchRESTClient - * the REST client configured to contact the search db - * abstraction - * @param searchTarget - * the URL to attempt to create the search index - * @param schemaLocation - * the location of the mappings file for the index - */ - private void createSearchIndex(String searchTarget, String schemaLocation) { - - logger.debug("Creating search index, searchTarget = " + searchTarget + ", schemaLocation = " + schemaLocation); - - MultivaluedMap headers = new MultivaluedMapImpl(); - headers.put("Accept", Arrays.asList("application/json")); - headers.put(Headers.FROM_APP_ID, Arrays.asList("DL")); - headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString())); - - try { - - OperationResult result = searchClient.put(searchTarget, loadFileData(schemaLocation), headers, - MediaType.APPLICATION_JSON_TYPE, null); - - if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) { - logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, result.getFailureCause()); - } else { - logger.info(EntityEventPolicyMsgs.SEARCH_INDEX_CREATE_SUCCESS, searchTarget); - } - - } catch (Exception e) { - logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, e.getLocalizedMessage()); - } - } - - /** - * Convenience method to load up all the data from a file into a string - * - * @param filename the filename to read from disk - * @return the data contained within the file - * @throws Exception - */ - protected String loadFileData(String filename) throws Exception { - StringBuilder data = new StringBuilder(); - try { - BufferedReader in = new BufferedReader(new InputStreamReader( - EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename), - StandardCharsets.UTF_8)); - String line; - - while ((line = in.readLine()) != null) { - data.append(line); - } - } catch (Exception e) { - throw new Exception("Failed to read from file = " + filename + ".", e); - } - - return data.toString(); - } - /** * Convert object to json. @@ -456,7 +405,7 @@ public class EntityEventPolicy implements Processor { return; } - handleSearchServiceOperation(aaiEventEntity, action, this.entitySearchTarget); + handleSearchServiceOperation(aaiEventEntity, action, entitySearchIndex); handleTopographicalData(uebPayload, action, entityType, oxmEntityType, oxmJaxbContext, entityPrimaryKeyFieldName, entityPrimaryKeyFieldValue); @@ -955,8 +904,7 @@ public class EntityEventPolicy implements Processor { String jsonPayload = aaiEventEntity.getAsJson(); // Run the GET to retrieve the ETAG from the search service - OperationResult storedEntity = - searchClient.get(entitySearchTarget+entityId, headers, MediaType.APPLICATION_JSON_TYPE); + OperationResult storedEntity = searchAgent.getDocument(entitySearchIndex, entityId); if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { /* @@ -997,16 +945,14 @@ public class EntityEventPolicy implements Processor { // Do the PUT with new CER ((ObjectNode)node).put("crossEntityReferenceValues", newCer); jsonPayload = NodeUtils.convertObjectToJson(node, false); - searchClient.put(entitySearchTarget + entityId, jsonPayload, headers, - MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + searchAgent.putDocument(entitySearchIndex, entityId, jsonPayload, headers); } } } else { if (storedEntity.getResultCode() == 404) { // entity not found, so attempt to do a PUT - searchClient.put(entitySearchTarget + entityId, aaiEventEntity.getAsJson(), headers, - MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + searchAgent.putDocument(entitySearchIndex, entityId, aaiEventEntity.getAsJson(), headers); } else { logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE, aaiEventEntity.getId(), "SYNC_ENTITY"); @@ -1039,8 +985,7 @@ public class EntityEventPolicy implements Processor { if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null) || action.equalsIgnoreCase(ACTION_UPDATE)) { // Run the GET to retrieve the ETAG from the search service - OperationResult storedEntity = - searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE); + OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId); if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { List etag = storedEntity.getHeaders().get(Headers.ETAG); @@ -1056,20 +1001,23 @@ public class EntityEventPolicy implements Processor { String eventEntityStr = eventEntity.getAsJson(); if (eventEntityStr != null) { - searchClient.put(target + entityId, eventEntity.getAsJson(), headers, - MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + List createIndex = new ArrayList(); + createIndex.add("true"); + headers.put("X-CreateIndex", createIndex); + searchAgent.putDocument(aggregateGenericVnfIndex, entityId, eventEntity.getAsJson(), headers); } } else if (action.equalsIgnoreCase(ACTION_CREATE)) { String eventEntityStr = eventEntity.getAsJson(); if (eventEntityStr != null) { - searchClient.post(target, eventEntityStr, headers, MediaType.APPLICATION_JSON_TYPE, - MediaType.APPLICATION_JSON_TYPE); + List createIndex = new ArrayList(); + createIndex.add("true"); + headers.put("X-CreateIndex", createIndex); + searchAgent.postDocument(aggregateGenericVnfIndex, eventEntityStr, headers); } } else if (action.equalsIgnoreCase(ACTION_DELETE)) { // Run the GET to retrieve the ETAG from the search service - OperationResult storedEntity = - searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE); + OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId); if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { List etag = storedEntity.getHeaders().get(Headers.ETAG); @@ -1081,7 +1029,7 @@ public class EntityEventPolicy implements Processor { entityId); } - searchClient.delete(target + eventEntity.getId(), headers, null); + searchAgent.deleteDocument(aggregateGenericVnfIndex, eventEntity.getId(), headers); } else { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId, entityId); @@ -1095,8 +1043,9 @@ public class EntityEventPolicy implements Processor { } } - private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action, - String target) { + private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, + String action, + String index) { try { Map> headers = new HashMap<>(); @@ -1111,8 +1060,7 @@ public class EntityEventPolicy implements Processor { || action.equalsIgnoreCase(ACTION_UPDATE)) { // Run the GET to retrieve the ETAG from the search service - OperationResult storedEntity = - searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE); + OperationResult storedEntity = searchAgent.getDocument(index, entityId); if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { List etag = storedEntity.getHeaders().get(Headers.ETAG); @@ -1120,20 +1068,22 @@ public class EntityEventPolicy implements Processor { if (etag != null && etag.size() > 0) { headers.put(Headers.IF_MATCH, etag); } else { - logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId, + logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } } - searchClient.put(target + entityId, eventEntity.getAsJson(), headers, - MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + // Write the entity to the search service. + // PUT + searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers); + } else if (action.equalsIgnoreCase(ACTION_CREATE)) { - searchClient.post(target, eventEntity.getAsJson(), headers, MediaType.APPLICATION_JSON_TYPE, - MediaType.APPLICATION_JSON_TYPE); + // Write the entry to the search service. + searchAgent.postDocument(index, eventEntity.getAsJson(), headers); + } else if (action.equalsIgnoreCase(ACTION_DELETE)) { // Run the GET to retrieve the ETAG from the search service - OperationResult storedEntity = - searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE); + OperationResult storedEntity = searchAgent.getDocument(index, entityId); if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) { List etag = storedEntity.getHeaders().get(Headers.ETAG); @@ -1141,13 +1091,13 @@ public class EntityEventPolicy implements Processor { if (etag != null && etag.size() > 0) { headers.put(Headers.IF_MATCH, etag); } else { - logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId, + logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } - searchClient.delete(target + eventEntity.getId(), headers, null); + searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else { - logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId, + logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } } else { diff --git a/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java b/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java new file mode 100644 index 0000000..3d27425 --- /dev/null +++ b/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java @@ -0,0 +1,368 @@ +/** + * ============LICENSE_START======================================================= + * DataRouter + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.openecomp.datarouter.util; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; +import javax.ws.rs.core.Response.Status; + +import org.eclipse.jetty.util.security.Password; +import org.openecomp.cl.api.Logger; +import org.openecomp.cl.mdc.MdcContext; +import org.openecomp.datarouter.logging.DataRouterMsgs; +import org.openecomp.datarouter.policy.EntityEventPolicy; +import org.openecomp.restclient.client.Headers; +import org.openecomp.restclient.client.OperationResult; +import org.openecomp.restclient.client.RestClient; +import org.openecomp.restclient.enums.RestAuthenticationMode; +import org.openecomp.restclient.rest.HttpUtil; +import org.slf4j.MDC; + +import com.sun.jersey.core.util.MultivaluedMapImpl; + +public class SearchServiceAgent { + + private Logger logger; + + private RestClient searchClient = null; + private Map indexSchemaMapping = new HashMap(); + + private String searchUrl = null; + private String documentEndpoint = null; + + + /** + * Creates a new instance of the search service agent. + * + * @param certName - Certificate to use for talking to the Search Service. + * @param keystore - Keystore to use for talking to the Search Service. + * @param keystorePwd - Keystore password for talking to the Search Service. + * @param searchUrl - URL at which the Search Service can be reached. + * @param documentEndpoint - Endpoint for accessing document resources on the Search Service. + * @param logger - Logger to use for system logs. + */ + public SearchServiceAgent(String certName, + String keystore, + String keystorePwd, + String searchUrl, + String documentEndpoint, + Logger logger) { + + initialize(certName, keystore, keystorePwd, searchUrl, documentEndpoint, logger); + } + + + /** + * Performs all one-time initialization required for the search agent. + * + * @param certName - Certificate to use for talking to the Search Service. + * @param keystore - Keystore to use for talking to the Search Service. + * @param keystorePwd - Keystore password for talking to the Search Service. + * @param searchUrl - URL at which the Search Service can be reached. + * @param documentEndpoint - Endpoint for accessing document resources on the Search Service. + * @param logger - Logger to use for system logs. + */ + private void initialize(String certName, + String keystore, + String keystorePwd, + String searchUrl, + String documentEndpoint, + Logger logger) { + + // Create REST client for search service + searchClient = new RestClient() + .authenticationMode(RestAuthenticationMode.SSL_CERT) + .validateServerHostname(false) + .validateServerCertChain(true) + .clientCertFile(DataRouterConstants.DR_HOME_AUTH + certName) + .clientCertPassword(Password.deobfuscate(keystorePwd)) + .trustStore(DataRouterConstants.DR_HOME_AUTH + keystore); + + this.searchUrl = searchUrl; + this.documentEndpoint = documentEndpoint; + + this.logger = logger; + } + + + /** + * Creates an index through the search db abstraction + * + * @param index - The name of the index to be created. + * @param schemaLocation - The name of the schema file for the index. + */ + public void createSearchIndex(String index, String schemaLocation) { + + // Create a mapping of the index name to schema location + indexSchemaMapping.put(index, schemaLocation); + + // Now, create the index. + createIndex(index, schemaLocation); + } + + + /** + * This method performs the actual work of creating a search index. + * + * @param index - The name of the index to be created. + * @param schemaLocation - The name of the schema file for the index. + */ + private void createIndex(String index, String schemaLocation) { + + logger.debug("Creating search index, index name: = " + index + ", schemaLocation = " + schemaLocation); + + MultivaluedMap headers = new MultivaluedMapImpl(); + headers.put("Accept", Arrays.asList("application/json")); + headers.put(Headers.FROM_APP_ID, Arrays.asList("DL")); + headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString())); + + String url = concatSubUri(searchUrl, index); + try { + + OperationResult result = searchClient.put(url, loadFileData(schemaLocation), headers, + MediaType.APPLICATION_JSON_TYPE, null); + + if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) { + logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, result.getFailureCause()); + } else { + logger.info(DataRouterMsgs.SEARCH_INDEX_CREATE_SUCCESS, index); + } + + } catch (Exception e) { + logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, e.getLocalizedMessage()); + } + } + + + /** + * Retrieves a document from the search service. + * + * @param index - The index to retrieve the document from. + * @param id - The unique identifier for the document. + * + * @return - The REST response returned from the Search Service. + */ + public OperationResult getDocument(String index, String id) { + + Map> headers = new HashMap<>(); + headers.put(Headers.FROM_APP_ID, Arrays.asList("Data Router")); + headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID))); + + String url = concatSubUri(searchUrl, index, documentEndpoint, id); + return searchClient.get(url, headers, MediaType.APPLICATION_JSON_TYPE); + } + + + /** + * Creates or updates a document in the Search Service. + * + * @param index - The index to create or update the document in. + * @param id - The identifier for the document. + * @param payload - The document contents. + * @param headers - HTTP headers. + */ + public void putDocument(String index, String id, String payload, Map> headers) { + + // Try to post the document to the search service. + OperationResult result = doDocumentPut(index, id, payload, headers); + + // A 404 response from the Search Service may indicate that the index we are writing + // to does not actually exist. We will try creating it now. + if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) { + + // Lookup the location of the schema that we want to create. + String indexSchemaLocation = indexSchemaMapping.get(index); + if(indexSchemaLocation != null) { + + // Try creating the index now... + logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index); + createIndex(index, indexSchemaLocation); + + // ...and retry the document post. + result = doDocumentPut(index, id, payload, headers); + } + } + + if(!resultSuccessful(result)) { + logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause()); + } + } + + + /** + * This method does the actual work of submitting a document PUT request to the Search Service. + * + * @param index - The index to create or update the document in. + * @param id - The identifier for the document. + * @param payload - The document contents. + * @param headers - HTTP headers. + * + * @return - The HTTP response returned by the Search Service. + */ + private OperationResult doDocumentPut(String index, String id, String payload, Map> headers) { + + String url = concatSubUri(searchUrl, index, documentEndpoint, id); + return searchClient.put(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + } + + + /** + * Creates a document in the Search Service. + * + * @param index - The index to create the document in. + * @param payload - The document contents. + * @param headers - HTTP headers. + */ + public void postDocument(String index, String payload, Map> headers) { + + // Try to post the document to the search service. + OperationResult result = doDocumentPost(index, payload, headers); + + // A 404 response from the Search Service may indicate that the index we are writing + // to does not actually exist. We will try creating it now. + if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) { + + // Lookup the location of the schema that we want to create. + String indexSchemaLocation = indexSchemaMapping.get(index); + if(indexSchemaLocation != null) { + + // Try creating the index now... + logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index); + createIndex(index, indexSchemaLocation); + + // ...and retry the document post. + result = doDocumentPost(index, payload, headers); + } + } + + if(!resultSuccessful(result)) { + logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause()); + } + } + + + /** + * This method does the actual work of submitting a document PUT request to the Search Service. + * + * @param index - The index to create or update the document in. + * @param payload - The document contents. + * @param headers - HTTP headers. + * + * @return - The HTTP response returned by the Search Service. + */ + private OperationResult doDocumentPost(String index, String payload, Map> headers) { + + String url = concatSubUri(searchUrl, index, documentEndpoint); + return searchClient.post(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE); + } + + + /** + * Removes a document from the Search Service. + * + * @param index - The index to create the document in. + * @param id - The identifier for the document. + * @param payload - The document contents. + * @param headers - HTTP headers. + */ + public void deleteDocument(String index, String documentId, Map> headers) { + + String url = concatSubUri(searchUrl, index, documentEndpoint, documentId); + searchClient.delete(url, headers, null); + } + + + /** + * Convenience method to load up all the data from a file into a string + * + * @param filename the filename to read from disk + * @return the data contained within the file + * @throws Exception + */ + protected String loadFileData(String filename) throws Exception { + StringBuilder data = new StringBuilder(); + try { + BufferedReader in = new BufferedReader(new InputStreamReader( + EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename), + StandardCharsets.UTF_8)); + String line; + + while ((line = in.readLine()) != null) { + data.append(line); + } + } catch (Exception e) { + throw new Exception("Failed to read from file = " + filename + ".", e); + } + + return data.toString(); + } + + + /** + * Helper utility to concatenate substrings of a URI together to form a proper URI. + * + * @param suburis the list of substrings to concatenate together + * @return the concatenated list of substrings + */ + public static String concatSubUri(String... suburis) { + String finalUri = ""; + + for (String suburi : suburis) { + + if (suburi != null) { + // Remove any leading / since we only want to append / + suburi = suburi.replaceFirst("^/*", ""); + + // Add a trailing / if one isn't already there + finalUri += suburi.endsWith("/") ? suburi : suburi + "/"; + } + } + + return finalUri; + } + + + /** + * Helper utility to check the response code of an HTTP response. + * + * @param aResult - The response that we want to check. + * + * @return - true if the response contains a success code, + * false otherwise. + */ + private boolean resultSuccessful(OperationResult aResult) { + + return (aResult.getResultCode() >= 200) && (aResult.getResultCode() < 300); + } +} -- cgit 1.2.3-korg