aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/datarouter
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/datarouter')
-rw-r--r--src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java16
-rw-r--r--src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java168
-rw-r--r--src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java368
3 files changed, 442 insertions, 110 deletions
diff --git a/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java b/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java
index 8304c96..71a5d5d 100644
--- a/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java
+++ b/src/main/java/org/openecomp/datarouter/logging/DataRouterMsgs.java
@@ -127,6 +127,11 @@ public enum DataRouterMsgs implements LogMessageEnum {
PROCESS_REST_REQUEST,
/**
+ * Index {0} may not exist in the search data store. Attempting to create it now.
+ */
+ CREATE_MISSING_INDEX,
+
+ /**
* Processed event {0}. Result: {1}
* Arguments: {0} = event topic {1} = result
*/
@@ -150,7 +155,16 @@ public enum DataRouterMsgs implements LogMessageEnum {
INVALID_OXM_FILE,
- INVALID_OXM_DIR;
+ INVALID_OXM_DIR,
+
+ /**
+ * Failed to create or update document in index {0}. Cause: {1}
+ *
+ * Arguments:
+ * {0} = Index name
+ * {1} = Failure cause
+ */
+ FAIL_TO_CREATE_UPDATE_DOC;
/**
* Static initializer to ensure the resource bundles for this class are loaded...
diff --git a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
index afddad2..3c3990e 100644
--- a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
+++ b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
@@ -73,6 +73,7 @@ import org.openecomp.datarouter.util.EntityOxmReferenceHelper;
import org.openecomp.datarouter.util.ExternalOxmModelProcessor;
import org.openecomp.datarouter.util.OxmModelLoader;
import org.openecomp.datarouter.util.RouterServiceUtil;
+import org.openecomp.datarouter.util.SearchServiceAgent;
import org.openecomp.datarouter.util.SearchSuggestionPermutation;
import org.openecomp.datarouter.util.Version;
import org.openecomp.datarouter.util.VersionedOxmEntities;
@@ -89,7 +90,6 @@ public class EntityEventPolicy implements Processor {
private static final String entitySearchSchema = "entitysearch_schema.json";
private static final String topographicalSearchSchema = "topographysearch_schema.json";
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
- RestClient searchClient = null;
private final String EVENT_HEADER = "event-header";
private final String ENTITY_HEADER = "entity";
@@ -106,9 +106,19 @@ public class EntityEventPolicy implements Processor {
Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
private String oxmVersion = null;
- private String entityIndexTarget = null;
+ /** Agent for communicating with the Search Service. */
+ private SearchServiceAgent searchAgent = null;
+
+ /** Search index name for storing AAI event entities. */
+ private String entitySearchIndex;
+
+ /** Search index name for storing topographical search data. */
+ private String topographicalSearchIndex;
+
+ /** Search index name for suggestive search data. */
+ private String aggregateGenericVnfIndex;
+
private String entitySearchTarget = null;
- private String topographicalIndexTarget = null;
private String topographicalSearchTarget = null;
private String autoSuggestSearchTarget = null;
private String aggregationSearchVnfTarget = null;
@@ -131,27 +141,27 @@ public class EntityEventPolicy implements Processor {
srcDomain = config.getSourceDomain();
- entityIndexTarget =
- EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
- config.getSearchEntitySearchIndex());
+ // Populate the index names.
+ entitySearchIndex = config.getSearchEntitySearchIndex();
+ topographicalSearchIndex = config.getSearchTopographySearchIndex();
+ aggregateGenericVnfIndex = config.getSearchAggregationVnfIndex();
+
+ // Instantiate the agent that we will use for interacting with the Search Service.
+ searchAgent = new SearchServiceAgent(config.getSearchCertName(),
+ config.getSearchKeystore(),
+ config.getSearchKeystorePwd(),
+ EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
+ config.getSearchEndpoint()),
+ config.getSearchEndpointDocuments(),
+ logger);
entitySearchTarget =
EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
config.getSearchEntitySearchIndex(), config.getSearchEndpointDocuments());
- topographicalIndexTarget =
- EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
- config.getSearchTopographySearchIndex());
-
topographicalSearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
config.getSearchEndpoint(), config.getSearchTopographySearchIndex());
- // Create REST client for search service
- searchClient = new RestClient().validateServerHostname(false).validateServerCertChain(true)
- .clientCertFile(DataRouterConstants.DR_HOME_AUTH + config.getSearchCertName())
- .clientCertPassword(Password.deobfuscate(config.getSearchKeystorePwd()))
- .trustStore(DataRouterConstants.DR_HOME_AUTH + config.getSearchKeystore());
-
autoSuggestSearchTarget =
EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
config.getSearchEntityAutoSuggestIndex(), config.getSearchEndpointDocuments());
@@ -193,73 +203,12 @@ public class EntityEventPolicy implements Processor {
public void startup() {
// Create the indexes in the search service if they do not already exist.
- createSearchIndex(entityIndexTarget, entitySearchSchema);
- createSearchIndex(topographicalIndexTarget, topographicalSearchSchema);
+ searchAgent.createSearchIndex(entitySearchIndex, entitySearchSchema);
+ searchAgent.createSearchIndex(topographicalSearchIndex, topographicalSearchSchema);
logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
}
- /**
- * Creates an index through the search db abstraction
- *
- * @param searchRESTClient
- * the REST client configured to contact the search db
- * abstraction
- * @param searchTarget
- * the URL to attempt to create the search index
- * @param schemaLocation
- * the location of the mappings file for the index
- */
- private void createSearchIndex(String searchTarget, String schemaLocation) {
-
- logger.debug("Creating search index, searchTarget = " + searchTarget + ", schemaLocation = " + schemaLocation);
-
- MultivaluedMap<String, String> headers = new MultivaluedMapImpl();
- headers.put("Accept", Arrays.asList("application/json"));
- headers.put(Headers.FROM_APP_ID, Arrays.asList("DL"));
- headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString()));
-
- try {
-
- OperationResult result = searchClient.put(searchTarget, loadFileData(schemaLocation), headers,
- MediaType.APPLICATION_JSON_TYPE, null);
-
- if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) {
- logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, result.getFailureCause());
- } else {
- logger.info(EntityEventPolicyMsgs.SEARCH_INDEX_CREATE_SUCCESS, searchTarget);
- }
-
- } catch (Exception e) {
- logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, e.getLocalizedMessage());
- }
- }
-
- /**
- * Convenience method to load up all the data from a file into a string
- *
- * @param filename the filename to read from disk
- * @return the data contained within the file
- * @throws Exception
- */
- protected String loadFileData(String filename) throws Exception {
- StringBuilder data = new StringBuilder();
- try {
- BufferedReader in = new BufferedReader(new InputStreamReader(
- EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename),
- StandardCharsets.UTF_8));
- String line;
-
- while ((line = in.readLine()) != null) {
- data.append(line);
- }
- } catch (Exception e) {
- throw new Exception("Failed to read from file = " + filename + ".", e);
- }
-
- return data.toString();
- }
-
/**
* Convert object to json.
@@ -456,7 +405,7 @@ public class EntityEventPolicy implements Processor {
return;
}
- handleSearchServiceOperation(aaiEventEntity, action, this.entitySearchTarget);
+ handleSearchServiceOperation(aaiEventEntity, action, entitySearchIndex);
handleTopographicalData(uebPayload, action, entityType, oxmEntityType, oxmJaxbContext,
entityPrimaryKeyFieldName, entityPrimaryKeyFieldValue);
@@ -955,8 +904,7 @@ public class EntityEventPolicy implements Processor {
String jsonPayload = aaiEventEntity.getAsJson();
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(entitySearchTarget+entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(entitySearchIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
/*
@@ -997,16 +945,14 @@ public class EntityEventPolicy implements Processor {
// Do the PUT with new CER
((ObjectNode)node).put("crossEntityReferenceValues", newCer);
jsonPayload = NodeUtils.convertObjectToJson(node, false);
- searchClient.put(entitySearchTarget + entityId, jsonPayload, headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ searchAgent.putDocument(entitySearchIndex, entityId, jsonPayload, headers);
}
}
} else {
if (storedEntity.getResultCode() == 404) {
// entity not found, so attempt to do a PUT
- searchClient.put(entitySearchTarget + entityId, aaiEventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ searchAgent.putDocument(entitySearchIndex, entityId, aaiEventEntity.getAsJson(), headers);
} else {
logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE,
aaiEventEntity.getId(), "SYNC_ENTITY");
@@ -1039,8 +985,7 @@ public class EntityEventPolicy implements Processor {
if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
|| action.equalsIgnoreCase(ACTION_UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1056,20 +1001,23 @@ public class EntityEventPolicy implements Processor {
String eventEntityStr = eventEntity.getAsJson();
if (eventEntityStr != null) {
- searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ List<String> createIndex = new ArrayList<String>();
+ createIndex.add("true");
+ headers.put("X-CreateIndex", createIndex);
+ searchAgent.putDocument(aggregateGenericVnfIndex, entityId, eventEntity.getAsJson(), headers);
}
} else if (action.equalsIgnoreCase(ACTION_CREATE)) {
String eventEntityStr = eventEntity.getAsJson();
if (eventEntityStr != null) {
- searchClient.post(target, eventEntityStr, headers, MediaType.APPLICATION_JSON_TYPE,
- MediaType.APPLICATION_JSON_TYPE);
+ List<String> createIndex = new ArrayList<String>();
+ createIndex.add("true");
+ headers.put("X-CreateIndex", createIndex);
+ searchAgent.postDocument(aggregateGenericVnfIndex, eventEntityStr, headers);
}
} else if (action.equalsIgnoreCase(ACTION_DELETE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1081,7 +1029,7 @@ public class EntityEventPolicy implements Processor {
entityId);
}
- searchClient.delete(target + eventEntity.getId(), headers, null);
+ searchAgent.deleteDocument(aggregateGenericVnfIndex, eventEntity.getId(), headers);
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
entityId);
@@ -1095,8 +1043,9 @@ public class EntityEventPolicy implements Processor {
}
}
- private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
- String target) {
+ private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity,
+ String action,
+ String index) {
try {
Map<String, List<String>> headers = new HashMap<>();
@@ -1111,8 +1060,7 @@ public class EntityEventPolicy implements Processor {
|| action.equalsIgnoreCase(ACTION_UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(index, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1120,20 +1068,22 @@ public class EntityEventPolicy implements Processor {
if (etag != null && etag.size() > 0) {
headers.put(Headers.IF_MATCH, etag);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
}
- searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ // Write the entity to the search service.
+ // PUT
+ searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
+
} else if (action.equalsIgnoreCase(ACTION_CREATE)) {
- searchClient.post(target, eventEntity.getAsJson(), headers, MediaType.APPLICATION_JSON_TYPE,
- MediaType.APPLICATION_JSON_TYPE);
+ // Write the entry to the search service.
+ searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
+
} else if (action.equalsIgnoreCase(ACTION_DELETE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(index, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1141,13 +1091,13 @@ public class EntityEventPolicy implements Processor {
if (etag != null && etag.size() > 0) {
headers.put(Headers.IF_MATCH, etag);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
- searchClient.delete(target + eventEntity.getId(), headers, null);
+ searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
} else {
diff --git a/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java b/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java
new file mode 100644
index 0000000..3d27425
--- /dev/null
+++ b/src/main/java/org/openecomp/datarouter/util/SearchServiceAgent.java
@@ -0,0 +1,368 @@
+/**
+ * ============LICENSE_START=======================================================
+ * DataRouter
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+package org.openecomp.datarouter.util;
+
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+import javax.ws.rs.core.Response.Status;
+
+import org.eclipse.jetty.util.security.Password;
+import org.openecomp.cl.api.Logger;
+import org.openecomp.cl.mdc.MdcContext;
+import org.openecomp.datarouter.logging.DataRouterMsgs;
+import org.openecomp.datarouter.policy.EntityEventPolicy;
+import org.openecomp.restclient.client.Headers;
+import org.openecomp.restclient.client.OperationResult;
+import org.openecomp.restclient.client.RestClient;
+import org.openecomp.restclient.enums.RestAuthenticationMode;
+import org.openecomp.restclient.rest.HttpUtil;
+import org.slf4j.MDC;
+
+import com.sun.jersey.core.util.MultivaluedMapImpl;
+
+public class SearchServiceAgent {
+
+ private Logger logger;
+
+ private RestClient searchClient = null;
+ private Map<String, String> indexSchemaMapping = new HashMap<String, String>();
+
+ private String searchUrl = null;
+ private String documentEndpoint = null;
+
+
+ /**
+ * Creates a new instance of the search service agent.
+ *
+ * @param certName - Certificate to use for talking to the Search Service.
+ * @param keystore - Keystore to use for talking to the Search Service.
+ * @param keystorePwd - Keystore password for talking to the Search Service.
+ * @param searchUrl - URL at which the Search Service can be reached.
+ * @param documentEndpoint - Endpoint for accessing document resources on the Search Service.
+ * @param logger - Logger to use for system logs.
+ */
+ public SearchServiceAgent(String certName,
+ String keystore,
+ String keystorePwd,
+ String searchUrl,
+ String documentEndpoint,
+ Logger logger) {
+
+ initialize(certName, keystore, keystorePwd, searchUrl, documentEndpoint, logger);
+ }
+
+
+ /**
+ * Performs all one-time initialization required for the search agent.
+ *
+ * @param certName - Certificate to use for talking to the Search Service.
+ * @param keystore - Keystore to use for talking to the Search Service.
+ * @param keystorePwd - Keystore password for talking to the Search Service.
+ * @param searchUrl - URL at which the Search Service can be reached.
+ * @param documentEndpoint - Endpoint for accessing document resources on the Search Service.
+ * @param logger - Logger to use for system logs.
+ */
+ private void initialize(String certName,
+ String keystore,
+ String keystorePwd,
+ String searchUrl,
+ String documentEndpoint,
+ Logger logger) {
+
+ // Create REST client for search service
+ searchClient = new RestClient()
+ .authenticationMode(RestAuthenticationMode.SSL_CERT)
+ .validateServerHostname(false)
+ .validateServerCertChain(true)
+ .clientCertFile(DataRouterConstants.DR_HOME_AUTH + certName)
+ .clientCertPassword(Password.deobfuscate(keystorePwd))
+ .trustStore(DataRouterConstants.DR_HOME_AUTH + keystore);
+
+ this.searchUrl = searchUrl;
+ this.documentEndpoint = documentEndpoint;
+
+ this.logger = logger;
+ }
+
+
+ /**
+ * Creates an index through the search db abstraction
+ *
+ * @param index - The name of the index to be created.
+ * @param schemaLocation - The name of the schema file for the index.
+ */
+ public void createSearchIndex(String index, String schemaLocation) {
+
+ // Create a mapping of the index name to schema location
+ indexSchemaMapping.put(index, schemaLocation);
+
+ // Now, create the index.
+ createIndex(index, schemaLocation);
+ }
+
+
+ /**
+ * This method performs the actual work of creating a search index.
+ *
+ * @param index - The name of the index to be created.
+ * @param schemaLocation - The name of the schema file for the index.
+ */
+ private void createIndex(String index, String schemaLocation) {
+
+ logger.debug("Creating search index, index name: = " + index + ", schemaLocation = " + schemaLocation);
+
+ MultivaluedMap<String, String> headers = new MultivaluedMapImpl();
+ headers.put("Accept", Arrays.asList("application/json"));
+ headers.put(Headers.FROM_APP_ID, Arrays.asList("DL"));
+ headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString()));
+
+ String url = concatSubUri(searchUrl, index);
+ try {
+
+ OperationResult result = searchClient.put(url, loadFileData(schemaLocation), headers,
+ MediaType.APPLICATION_JSON_TYPE, null);
+
+ if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) {
+ logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, result.getFailureCause());
+ } else {
+ logger.info(DataRouterMsgs.SEARCH_INDEX_CREATE_SUCCESS, index);
+ }
+
+ } catch (Exception e) {
+ logger.error(DataRouterMsgs.FAIL_TO_CREATE_SEARCH_INDEX, index, e.getLocalizedMessage());
+ }
+ }
+
+
+ /**
+ * Retrieves a document from the search service.
+ *
+ * @param index - The index to retrieve the document from.
+ * @param id - The unique identifier for the document.
+ *
+ * @return - The REST response returned from the Search Service.
+ */
+ public OperationResult getDocument(String index, String id) {
+
+ Map<String, List<String>> headers = new HashMap<>();
+ headers.put(Headers.FROM_APP_ID, Arrays.asList("Data Router"));
+ headers.put(Headers.TRANSACTION_ID, Arrays.asList(MDC.get(MdcContext.MDC_REQUEST_ID)));
+
+ String url = concatSubUri(searchUrl, index, documentEndpoint, id);
+ return searchClient.get(url, headers, MediaType.APPLICATION_JSON_TYPE);
+ }
+
+
+ /**
+ * Creates or updates a document in the Search Service.
+ *
+ * @param index - The index to create or update the document in.
+ * @param id - The identifier for the document.
+ * @param payload - The document contents.
+ * @param headers - HTTP headers.
+ */
+ public void putDocument(String index, String id, String payload, Map<String, List<String>> headers) {
+
+ // Try to post the document to the search service.
+ OperationResult result = doDocumentPut(index, id, payload, headers);
+
+ // A 404 response from the Search Service may indicate that the index we are writing
+ // to does not actually exist. We will try creating it now.
+ if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) {
+
+ // Lookup the location of the schema that we want to create.
+ String indexSchemaLocation = indexSchemaMapping.get(index);
+ if(indexSchemaLocation != null) {
+
+ // Try creating the index now...
+ logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index);
+ createIndex(index, indexSchemaLocation);
+
+ // ...and retry the document post.
+ result = doDocumentPut(index, id, payload, headers);
+ }
+ }
+
+ if(!resultSuccessful(result)) {
+ logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause());
+ }
+ }
+
+
+ /**
+ * This method does the actual work of submitting a document PUT request to the Search Service.
+ *
+ * @param index - The index to create or update the document in.
+ * @param id - The identifier for the document.
+ * @param payload - The document contents.
+ * @param headers - HTTP headers.
+ *
+ * @return - The HTTP response returned by the Search Service.
+ */
+ private OperationResult doDocumentPut(String index, String id, String payload, Map<String, List<String>> headers) {
+
+ String url = concatSubUri(searchUrl, index, documentEndpoint, id);
+ return searchClient.put(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ }
+
+
+ /**
+ * Creates a document in the Search Service.
+ *
+ * @param index - The index to create the document in.
+ * @param payload - The document contents.
+ * @param headers - HTTP headers.
+ */
+ public void postDocument(String index, String payload, Map<String, List<String>> headers) {
+
+ // Try to post the document to the search service.
+ OperationResult result = doDocumentPost(index, payload, headers);
+
+ // A 404 response from the Search Service may indicate that the index we are writing
+ // to does not actually exist. We will try creating it now.
+ if(result.getResultCode() == Status.NOT_FOUND.getStatusCode()) {
+
+ // Lookup the location of the schema that we want to create.
+ String indexSchemaLocation = indexSchemaMapping.get(index);
+ if(indexSchemaLocation != null) {
+
+ // Try creating the index now...
+ logger.info(DataRouterMsgs.CREATE_MISSING_INDEX, index);
+ createIndex(index, indexSchemaLocation);
+
+ // ...and retry the document post.
+ result = doDocumentPost(index, payload, headers);
+ }
+ }
+
+ if(!resultSuccessful(result)) {
+ logger.error(DataRouterMsgs.FAIL_TO_CREATE_UPDATE_DOC, index, result.getFailureCause());
+ }
+ }
+
+
+ /**
+ * This method does the actual work of submitting a document PUT request to the Search Service.
+ *
+ * @param index - The index to create or update the document in.
+ * @param payload - The document contents.
+ * @param headers - HTTP headers.
+ *
+ * @return - The HTTP response returned by the Search Service.
+ */
+ private OperationResult doDocumentPost(String index, String payload, Map<String, List<String>> headers) {
+
+ String url = concatSubUri(searchUrl, index, documentEndpoint);
+ return searchClient.post(url, payload, headers, MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ }
+
+
+ /**
+ * Removes a document from the Search Service.
+ *
+ * @param index - The index to create the document in.
+ * @param id - The identifier for the document.
+ * @param payload - The document contents.
+ * @param headers - HTTP headers.
+ */
+ public void deleteDocument(String index, String documentId, Map<String, List<String>> headers) {
+
+ String url = concatSubUri(searchUrl, index, documentEndpoint, documentId);
+ searchClient.delete(url, headers, null);
+ }
+
+
+ /**
+ * Convenience method to load up all the data from a file into a string
+ *
+ * @param filename the filename to read from disk
+ * @return the data contained within the file
+ * @throws Exception
+ */
+ protected String loadFileData(String filename) throws Exception {
+ StringBuilder data = new StringBuilder();
+ try {
+ BufferedReader in = new BufferedReader(new InputStreamReader(
+ EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename),
+ StandardCharsets.UTF_8));
+ String line;
+
+ while ((line = in.readLine()) != null) {
+ data.append(line);
+ }
+ } catch (Exception e) {
+ throw new Exception("Failed to read from file = " + filename + ".", e);
+ }
+
+ return data.toString();
+ }
+
+
+ /**
+ * Helper utility to concatenate substrings of a URI together to form a proper URI.
+ *
+ * @param suburis the list of substrings to concatenate together
+ * @return the concatenated list of substrings
+ */
+ public static String concatSubUri(String... suburis) {
+ String finalUri = "";
+
+ for (String suburi : suburis) {
+
+ if (suburi != null) {
+ // Remove any leading / since we only want to append /
+ suburi = suburi.replaceFirst("^/*", "");
+
+ // Add a trailing / if one isn't already there
+ finalUri += suburi.endsWith("/") ? suburi : suburi + "/";
+ }
+ }
+
+ return finalUri;
+ }
+
+
+ /**
+ * Helper utility to check the response code of an HTTP response.
+ *
+ * @param aResult - The response that we want to check.
+ *
+ * @return - true if the response contains a success code,
+ * false otherwise.
+ */
+ private boolean resultSuccessful(OperationResult aResult) {
+
+ return (aResult.getResultCode() >= 200) && (aResult.getResultCode() < 300);
+ }
+}