aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java')
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java278
1 files changed, 254 insertions, 24 deletions
diff --git a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
index 9627365..6d04ed6 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
@@ -33,9 +33,11 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.http.conn.routing.RouteInfo.LayerType;
import org.eclipse.persistence.dynamic.DynamicType;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
import org.json.JSONException;
import org.json.JSONObject;
import org.onap.aai.cl.api.Logger;
@@ -44,6 +46,7 @@ import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
import org.onap.aai.datarouter.entity.SpikeEventEntity;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
import org.onap.aai.datarouter.entity.SpikeEventVertex;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
@@ -51,6 +54,8 @@ import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
import org.onap.aai.datarouter.util.OxmModelLoader;
import org.onap.aai.datarouter.util.RouterServiceUtil;
import org.onap.aai.datarouter.util.SearchServiceAgent;
+import org.onap.aai.datarouter.util.Version;
+import org.onap.aai.datarouter.util.VersionedOxmEntities;
import org.onap.aai.restclient.client.Headers;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.restclient.rest.HttpUtil;
@@ -59,30 +64,41 @@ import org.slf4j.MDC;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
public abstract class AbstractSpikeEntityEventProcessor implements Processor {
protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
-
- protected final String ACTION_CREATE = "create";
- private final String EVENT_VERTEX = "vertex";
- public final static String ACTION_DELETE = "delete";
- protected final String ACTION_UPDATE = "update";
protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
- private final String OPERATION_KEY = "operation";
+ protected static final String UPDATE_NOTIFICATION = "update-notification";
+ protected static final String SPIKE = "SPIKE";
+
+ protected static final String HEADER_KEY = "header";
+ protected static final String EVENT_TYPE_KEY = "event-type";
+ protected static final String SOURCE_NAME_KEY = "source-name";
+ protected static final String BODY_KEY = "body";
+ protected static final String OPERATION_KEY = "operation";
+
+ protected static final String VERTEX_KEY = "vertex";
+ protected static final String VERTEX_PROPERTIES_KEY = "properties";
+ protected static final String VERTEX_KEY_KEY = "key";
+ protected static final String VERTEX_TYPE_KEY = "type";
+ protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+
+ protected static final String CREATE = "create";
+ protected static final String DELETE = "delete";
+ protected static final String UPDATE = "update";
+
protected enum ResponseType {
SUCCESS, PARTIAL_SUCCESS, FAILURE;
}
- private final List<String> SUPPORTED_ACTIONS =
- Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+ protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
- Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
- private String oxmVersion = null;
+ protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
+ protected String oxmVersion = null;
/** Agent for communicating with the Search Service. */
protected SearchServiceAgent searchAgent = null;
@@ -268,9 +284,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Load the UEB payload data, any errors will result in a failure and discard
- JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
+ JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
if (spikeObjVertex == null) {
- returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+ returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
return null;
}
@@ -336,7 +352,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
String entityPrimaryKeyFieldName =
- getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+ getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
if (entityPrimaryKeyFieldName == null) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary key attribute");
@@ -346,7 +362,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
return null;
}
String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
- if (entityPrimaryKeyFieldValue.isEmpty()) {
+ if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary value attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
@@ -425,8 +441,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Make sure that were were actually passed in a valid string.
if (payload == null || payload.isEmpty()) {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+ logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
+ logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
return eventVertex;
}
@@ -448,7 +464,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
}
- private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String payload,
+ private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
String oxmEntityType, String entityType) {
DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
@@ -478,9 +494,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
try {
jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
} catch (IOException e) {
- logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
payload);
- logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
"");
}
@@ -634,8 +650,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
String entityId = eventEntity.getId();
- if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
- || action.equalsIgnoreCase(ACTION_UPDATE)) {
+ if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+ || action.equalsIgnoreCase(UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -653,11 +669,11 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Write the entity to the search service.
// PUT
searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+ } else if (action.equalsIgnoreCase(CREATE)) {
// Write the entry to the search service.
searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+ } else if (action.equalsIgnoreCase(DELETE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -670,6 +686,13 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
}
+ /*
+ * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+ * we specify a Content-Type.
+ */
+
+ headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
+
searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
@@ -682,4 +705,211 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
action);
}
}
+
+ protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
+
+ SpikeEventMeta meta = new SpikeEventMeta();
+ Object eventPayloadObj = null;
+ String eventPayload = null;
+ try {
+ eventPayloadObj = exchange.getIn().getBody();
+
+ /*
+ * It is expected that mainJson will have multiple top level objects: - header - body - result
+ */
+ if (eventPayloadObj == null) {
+ returnWithError(exchange, null, "Invalid Payload");
+ return null;
+ }
+
+ eventPayload = (String)eventPayloadObj;
+
+ meta.setEventEntity(new JSONObject(eventPayload));
+ } catch (JSONException exc) {
+ returnWithError(exchange, eventPayload, "Invalid Payload");
+ return null;
+ }
+
+ JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
+
+ if (eventHeader == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+ return null;
+ }
+
+ meta.setEventHeader(eventHeader);
+
+ /*
+ * Only process SPIKE update-notification events
+ */
+
+ final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+ final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
+
+ if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+ // drop event
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+ + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+ /*
+ * I don't think ignoring a non-applicable event constitutes a failure.
+ */
+
+ setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+ return null;
+ }
+
+ JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
+
+ if (eventBody == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+ return null;
+ }
+
+ meta.setEventBody(eventBody);
+
+ String action = eventBody.getString(OPERATION_KEY);
+ if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Unrecognized action '" + action + "'", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Unrecognized action '" + action + "'");
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ meta.setBodyOperationType(action);
+
+ // Load the event body data, any errors will result in a failure and discard
+
+ JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+ if (spikeVertex == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
+ return null;
+ }
+
+ meta.setSpikeVertex(spikeVertex);
+
+ SpikeEventVertex spikeEventVertex = null;
+ try {
+ spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage(), eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage());
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setSpikeEventVertex(spikeEventVertex);
+
+ DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
+ if (oxmJaxbContext == null) {
+ logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
+ eventPayload);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setOxmJaxbContext(oxmJaxbContext);
+
+ String entityType = spikeEventVertex.getType();
+ if (entityType == null || entityType.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity type", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity type");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ /*
+ * test if entityType is in the model
+ */
+
+ VersionedOxmEntities oxmEntities =
+ EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion));
+
+ if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "No matching OXM Descriptor for entity-type='" + entityType + "'");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+
+ String entityKey = spikeEventVertex.getKey();
+ if (entityKey == null || entityKey.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload vertex missing entity key");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ String entityLink = spikeEventVertex.getEntityLink();
+ if (entityLink == null || entityLink.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity link", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity link");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ JSONObject vertexProperties = null;
+ try {
+
+ vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
+
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setVertexProperties(vertexProperties);
+
+ // log the fact that all data are in good shape
+ logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
+ logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
+ eventPayload);
+
+ return meta;
+
+ }
+
+ protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
+
+ /*
+ * These are all critical keys
+ */
+
+ final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+ final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+ final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
+
+ SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+ eventVertex.setSchemaVersion(vertexSchemaVersion);
+ logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
+
+ return eventVertex;
+
+ }
+
}