aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java')
-rw-r--r--src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java168
1 files changed, 59 insertions, 109 deletions
diff --git a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
index afddad2..3c3990e 100644
--- a/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
+++ b/src/main/java/org/openecomp/datarouter/policy/EntityEventPolicy.java
@@ -73,6 +73,7 @@ import org.openecomp.datarouter.util.EntityOxmReferenceHelper;
import org.openecomp.datarouter.util.ExternalOxmModelProcessor;
import org.openecomp.datarouter.util.OxmModelLoader;
import org.openecomp.datarouter.util.RouterServiceUtil;
+import org.openecomp.datarouter.util.SearchServiceAgent;
import org.openecomp.datarouter.util.SearchSuggestionPermutation;
import org.openecomp.datarouter.util.Version;
import org.openecomp.datarouter.util.VersionedOxmEntities;
@@ -89,7 +90,6 @@ public class EntityEventPolicy implements Processor {
private static final String entitySearchSchema = "entitysearch_schema.json";
private static final String topographicalSearchSchema = "topographysearch_schema.json";
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
- RestClient searchClient = null;
private final String EVENT_HEADER = "event-header";
private final String ENTITY_HEADER = "entity";
@@ -106,9 +106,19 @@ public class EntityEventPolicy implements Processor {
Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
private String oxmVersion = null;
- private String entityIndexTarget = null;
+ /** Agent for communicating with the Search Service. */
+ private SearchServiceAgent searchAgent = null;
+
+ /** Search index name for storing AAI event entities. */
+ private String entitySearchIndex;
+
+ /** Search index name for storing topographical search data. */
+ private String topographicalSearchIndex;
+
+ /** Search index name for suggestive search data. */
+ private String aggregateGenericVnfIndex;
+
private String entitySearchTarget = null;
- private String topographicalIndexTarget = null;
private String topographicalSearchTarget = null;
private String autoSuggestSearchTarget = null;
private String aggregationSearchVnfTarget = null;
@@ -131,27 +141,27 @@ public class EntityEventPolicy implements Processor {
srcDomain = config.getSourceDomain();
- entityIndexTarget =
- EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
- config.getSearchEntitySearchIndex());
+ // Populate the index names.
+ entitySearchIndex = config.getSearchEntitySearchIndex();
+ topographicalSearchIndex = config.getSearchTopographySearchIndex();
+ aggregateGenericVnfIndex = config.getSearchAggregationVnfIndex();
+
+ // Instantiate the agent that we will use for interacting with the Search Service.
+ searchAgent = new SearchServiceAgent(config.getSearchCertName(),
+ config.getSearchKeystore(),
+ config.getSearchKeystorePwd(),
+ EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
+ config.getSearchEndpoint()),
+ config.getSearchEndpointDocuments(),
+ logger);
entitySearchTarget =
EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
config.getSearchEntitySearchIndex(), config.getSearchEndpointDocuments());
- topographicalIndexTarget =
- EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
- config.getSearchTopographySearchIndex());
-
topographicalSearchTarget = EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(),
config.getSearchEndpoint(), config.getSearchTopographySearchIndex());
- // Create REST client for search service
- searchClient = new RestClient().validateServerHostname(false).validateServerCertChain(true)
- .clientCertFile(DataRouterConstants.DR_HOME_AUTH + config.getSearchCertName())
- .clientCertPassword(Password.deobfuscate(config.getSearchKeystorePwd()))
- .trustStore(DataRouterConstants.DR_HOME_AUTH + config.getSearchKeystore());
-
autoSuggestSearchTarget =
EntityEventPolicy.concatSubUri(config.getSearchBaseUrl(), config.getSearchEndpoint(),
config.getSearchEntityAutoSuggestIndex(), config.getSearchEndpointDocuments());
@@ -193,73 +203,12 @@ public class EntityEventPolicy implements Processor {
public void startup() {
// Create the indexes in the search service if they do not already exist.
- createSearchIndex(entityIndexTarget, entitySearchSchema);
- createSearchIndex(topographicalIndexTarget, topographicalSearchSchema);
+ searchAgent.createSearchIndex(entitySearchIndex, entitySearchSchema);
+ searchAgent.createSearchIndex(topographicalSearchIndex, topographicalSearchSchema);
logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
}
- /**
- * Creates an index through the search db abstraction
- *
- * @param searchRESTClient
- * the REST client configured to contact the search db
- * abstraction
- * @param searchTarget
- * the URL to attempt to create the search index
- * @param schemaLocation
- * the location of the mappings file for the index
- */
- private void createSearchIndex(String searchTarget, String schemaLocation) {
-
- logger.debug("Creating search index, searchTarget = " + searchTarget + ", schemaLocation = " + schemaLocation);
-
- MultivaluedMap<String, String> headers = new MultivaluedMapImpl();
- headers.put("Accept", Arrays.asList("application/json"));
- headers.put(Headers.FROM_APP_ID, Arrays.asList("DL"));
- headers.put(Headers.TRANSACTION_ID, Arrays.asList(UUID.randomUUID().toString()));
-
- try {
-
- OperationResult result = searchClient.put(searchTarget, loadFileData(schemaLocation), headers,
- MediaType.APPLICATION_JSON_TYPE, null);
-
- if (!HttpUtil.isHttpResponseClassSuccess(result.getResultCode())) {
- logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, result.getFailureCause());
- } else {
- logger.info(EntityEventPolicyMsgs.SEARCH_INDEX_CREATE_SUCCESS, searchTarget);
- }
-
- } catch (Exception e) {
- logger.error(EntityEventPolicyMsgs.FAIL_TO_CREATE_SEARCH_INDEX, searchTarget, e.getLocalizedMessage());
- }
- }
-
- /**
- * Convenience method to load up all the data from a file into a string
- *
- * @param filename the filename to read from disk
- * @return the data contained within the file
- * @throws Exception
- */
- protected String loadFileData(String filename) throws Exception {
- StringBuilder data = new StringBuilder();
- try {
- BufferedReader in = new BufferedReader(new InputStreamReader(
- EntityEventPolicy.class.getClassLoader().getResourceAsStream("/" + filename),
- StandardCharsets.UTF_8));
- String line;
-
- while ((line = in.readLine()) != null) {
- data.append(line);
- }
- } catch (Exception e) {
- throw new Exception("Failed to read from file = " + filename + ".", e);
- }
-
- return data.toString();
- }
-
/**
* Convert object to json.
@@ -456,7 +405,7 @@ public class EntityEventPolicy implements Processor {
return;
}
- handleSearchServiceOperation(aaiEventEntity, action, this.entitySearchTarget);
+ handleSearchServiceOperation(aaiEventEntity, action, entitySearchIndex);
handleTopographicalData(uebPayload, action, entityType, oxmEntityType, oxmJaxbContext,
entityPrimaryKeyFieldName, entityPrimaryKeyFieldValue);
@@ -955,8 +904,7 @@ public class EntityEventPolicy implements Processor {
String jsonPayload = aaiEventEntity.getAsJson();
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(entitySearchTarget+entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(entitySearchIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
/*
@@ -997,16 +945,14 @@ public class EntityEventPolicy implements Processor {
// Do the PUT with new CER
((ObjectNode)node).put("crossEntityReferenceValues", newCer);
jsonPayload = NodeUtils.convertObjectToJson(node, false);
- searchClient.put(entitySearchTarget + entityId, jsonPayload, headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ searchAgent.putDocument(entitySearchIndex, entityId, jsonPayload, headers);
}
}
} else {
if (storedEntity.getResultCode() == 404) {
// entity not found, so attempt to do a PUT
- searchClient.put(entitySearchTarget + entityId, aaiEventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ searchAgent.putDocument(entitySearchIndex, entityId, aaiEventEntity.getAsJson(), headers);
} else {
logger.error(EntityEventPolicyMsgs.FAILED_TO_UPDATE_ENTITY_IN_DOCSTORE,
aaiEventEntity.getId(), "SYNC_ENTITY");
@@ -1039,8 +985,7 @@ public class EntityEventPolicy implements Processor {
if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
|| action.equalsIgnoreCase(ACTION_UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1056,20 +1001,23 @@ public class EntityEventPolicy implements Processor {
String eventEntityStr = eventEntity.getAsJson();
if (eventEntityStr != null) {
- searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ List<String> createIndex = new ArrayList<String>();
+ createIndex.add("true");
+ headers.put("X-CreateIndex", createIndex);
+ searchAgent.putDocument(aggregateGenericVnfIndex, entityId, eventEntity.getAsJson(), headers);
}
} else if (action.equalsIgnoreCase(ACTION_CREATE)) {
String eventEntityStr = eventEntity.getAsJson();
if (eventEntityStr != null) {
- searchClient.post(target, eventEntityStr, headers, MediaType.APPLICATION_JSON_TYPE,
- MediaType.APPLICATION_JSON_TYPE);
+ List<String> createIndex = new ArrayList<String>();
+ createIndex.add("true");
+ headers.put("X-CreateIndex", createIndex);
+ searchAgent.postDocument(aggregateGenericVnfIndex, eventEntityStr, headers);
}
} else if (action.equalsIgnoreCase(ACTION_DELETE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(aggregateGenericVnfIndex, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1081,7 +1029,7 @@ public class EntityEventPolicy implements Processor {
entityId);
}
- searchClient.delete(target + eventEntity.getId(), headers, null);
+ searchAgent.deleteDocument(aggregateGenericVnfIndex, eventEntity.getId(), headers);
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
entityId);
@@ -1095,8 +1043,9 @@ public class EntityEventPolicy implements Processor {
}
}
- private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity, String action,
- String target) {
+ private void handleSearchServiceOperation(DocumentStoreDataEntity eventEntity,
+ String action,
+ String index) {
try {
Map<String, List<String>> headers = new HashMap<>();
@@ -1111,8 +1060,7 @@ public class EntityEventPolicy implements Processor {
|| action.equalsIgnoreCase(ACTION_UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(index, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1120,20 +1068,22 @@ public class EntityEventPolicy implements Processor {
if (etag != null && etag.size() > 0) {
headers.put(Headers.IF_MATCH, etag);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
}
- searchClient.put(target + entityId, eventEntity.getAsJson(), headers,
- MediaType.APPLICATION_JSON_TYPE, MediaType.APPLICATION_JSON_TYPE);
+ // Write the entity to the search service.
+ // PUT
+ searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
+
} else if (action.equalsIgnoreCase(ACTION_CREATE)) {
- searchClient.post(target, eventEntity.getAsJson(), headers, MediaType.APPLICATION_JSON_TYPE,
- MediaType.APPLICATION_JSON_TYPE);
+ // Write the entry to the search service.
+ searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
+
} else if (action.equalsIgnoreCase(ACTION_DELETE)) {
// Run the GET to retrieve the ETAG from the search service
- OperationResult storedEntity =
- searchClient.get(target + entityId, headers, MediaType.APPLICATION_JSON_TYPE);
+ OperationResult storedEntity = searchAgent.getDocument(index, entityId);
if (HttpUtil.isHttpResponseClassSuccess(storedEntity.getResultCode())) {
List<String> etag = storedEntity.getHeaders().get(Headers.ETAG);
@@ -1141,13 +1091,13 @@ public class EntityEventPolicy implements Processor {
if (etag != null && etag.size() > 0) {
headers.put(Headers.IF_MATCH, etag);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
- searchClient.delete(target + eventEntity.getId(), headers, null);
+ searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
- logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, target + entityId,
+ logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
} else {