diff options
author | da490c <dave.adams@amdocs.com> | 2018-06-12 23:11:15 -0400 |
---|---|---|
committer | da490c <dave.adams@amdocs.com> | 2018-06-12 23:54:25 -0400 |
commit | 72da66902a3efaff3068ed2b3f4f7eb3bb5fe43f (patch) | |
tree | 9cfacefcb3543589bff774f9f40cc7fd548e8430 /src/main/java/org/onap | |
parent | 7753d7e90f5d91b143043a642e90dd30806da2ba (diff) |
Fix Spike Event Processing with Common Format
Issue-ID: AAI-1215
Change-Id: Ic6b575e9194d00cc0da9229cdb532869357e37e5
Signed-off-by: da490c <dave.adams@amdocs.com>
Diffstat (limited to 'src/main/java/org/onap')
11 files changed, 679 insertions, 342 deletions
diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java index 5aff3ce..bb52802 100644 --- a/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java +++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java @@ -28,6 +28,7 @@ import java.util.Map.Entry; import org.onap.aai.datarouter.util.NodeUtils; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -36,6 +37,13 @@ import com.fasterxml.jackson.databind.node.ObjectNode; * The Class SpikeAggregationEntity. Mimics functionality of SPIKEUI's AggregationEntity */ public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializable { + + private Map<String, String> attributes = new HashMap<>(); + + @JsonIgnore + private ObjectMapper mapper = new ObjectMapper(); + + private String id; private String link; private String lastmodTimestamp; @@ -68,28 +76,23 @@ public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializ } - Map<String, String> attributes = new HashMap<>(); - ObjectMapper mapper = new ObjectMapper(); /** * Instantiates a new aggregation entity. */ public SpikeAggregationEntity() {} - public void deriveFields(JsonNode uebPayload) { + public void deriveFields(JsonNode entityProperties) { this.setId(NodeUtils.generateUniqueShaDigest(link)); this.setLastmodTimestamp(Long.toString(System.currentTimeMillis())); - JsonNode entityNode = uebPayload.get("vertex").get("properties"); - Iterator<Entry<String, JsonNode>> nodes = entityNode.fields(); + Iterator<Entry<String, JsonNode>> nodes = entityProperties.fields(); while (nodes.hasNext()) { Map.Entry<String, JsonNode> entry = (Map.Entry<String, JsonNode>) nodes.next(); attributes.put(entry.getKey(), entry.getValue().asText()); } } - - @Override public String getAsJson() { ObjectNode rootNode = mapper.createObjectNode(); @@ -105,7 +108,12 @@ public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializ @Override public String toString() { - return "AggregationEntity [id=" + id + ", link=" + link + ", attributes=" + attributes - + ", mapper=" + mapper + "]"; + return "SpikeAggregationEntity [" + + (attributes != null ? "attributes=" + attributes + ", " : "") + + (id != null ? "id=" + id + ", " : "") + (link != null ? "link=" + link + ", " : "") + + (lastmodTimestamp != null ? "lastmodTimestamp=" + lastmodTimestamp : "") + "]"; } + + + } diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java new file mode 100644 index 0000000..476b54e --- /dev/null +++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java @@ -0,0 +1,80 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datarouter.entity; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class SpikeEventHeader { + + @JsonProperty("request-id") + private String requestId; + + private String timestamp; + + @JsonProperty("source-name") + private String sourceName; + + @JsonProperty("event-type") + private String eventType; + + public String getRequestId() { + return requestId; + } + + public void setRequestId(String requestId) { + this.requestId = requestId; + } + + public String getTimestamp() { + return timestamp; + } + + public void setTimestamp(String timestamp) { + this.timestamp = timestamp; + } + + public String getSourceName() { + return sourceName; + } + + public void setSourceName(String sourceName) { + this.sourceName = sourceName; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + @Override + public String toString() { + return "SpikeEventHeader [" + (requestId != null ? "requestId=" + requestId + ", " : "") + + (timestamp != null ? "timestamp=" + timestamp + ", " : "") + + (sourceName != null ? "sourceName=" + sourceName + ", " : "") + + (eventType != null ? "eventType=" + eventType : "") + "]"; + } + +} diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java new file mode 100644 index 0000000..40c4acc --- /dev/null +++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java @@ -0,0 +1,104 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.datarouter.entity; + +import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.json.JSONObject; + +public class SpikeEventMeta { + + private JSONObject eventEntity; + private JSONObject eventHeader; + private JSONObject eventBody; + private JSONObject spikeVertex; + private JSONObject vertexProperties; + private SpikeEventVertex spikeEventVertex; + private DynamicJAXBContext oxmJaxbContext; + private String bodyOperationType; + + public JSONObject getEventEntity() { + return eventEntity; + } + + public void setEventEntity(JSONObject eventEntity) { + this.eventEntity = eventEntity; + } + + public JSONObject getEventHeader() { + return eventHeader; + } + + public void setEventHeader(JSONObject eventHeader) { + this.eventHeader = eventHeader; + } + + public JSONObject getEventBody() { + return eventBody; + } + + public void setEventBody(JSONObject eventBody) { + this.eventBody = eventBody; + } + + public JSONObject getSpikeVertex() { + return spikeVertex; + } + + public void setSpikeVertex(JSONObject spikeVertex) { + this.spikeVertex = spikeVertex; + } + + public JSONObject getVertexProperties() { + return vertexProperties; + } + + public void setVertexProperties(JSONObject vertexProperties) { + this.vertexProperties = vertexProperties; + } + + public SpikeEventVertex getSpikeEventVertex() { + return spikeEventVertex; + } + + public void setSpikeEventVertex(SpikeEventVertex spikeEventVertex) { + this.spikeEventVertex = spikeEventVertex; + } + + public DynamicJAXBContext getOxmJaxbContext() { + return oxmJaxbContext; + } + + public void setOxmJaxbContext(DynamicJAXBContext oxmJaxbContext) { + this.oxmJaxbContext = oxmJaxbContext; + } + + public String getBodyOperationType() { + return bodyOperationType; + } + + public void setBodyOperationType(String bodyOperationType) { + this.bodyOperationType = bodyOperationType; + } + + + + +} diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java index 43b3816..2350f93 100644 --- a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java +++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java @@ -20,31 +20,29 @@ */ package org.onap.aai.datarouter.entity; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; - /** * A convenience POJO for mapping the Vertex from a Spike Event. * - * @author salmaA */ -@JsonIgnoreProperties(ignoreUnknown = true) public class SpikeEventVertex { - private String key; + private final String key; private String schemaVersion; - private String type; + private final String type; + private final String entityLink; - - public String getKey() { - return key; + public SpikeEventVertex(String type, String key) { + this.type = type; + this.key = key; + this.entityLink = type + "/" + key; } - public void setKey(String key) { - this.key = key; + public String getKey() { + return key; } public String getSchemaVersion() { @@ -59,12 +57,8 @@ public class SpikeEventVertex { return type; } - public void setType(String type) { - this.type = type; - } - public String getEntityLink() { - return this.type + "/" + this.key; + return entityLink; } - + } diff --git a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java index 9627365..6d04ed6 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java +++ b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java @@ -33,9 +33,11 @@ import java.util.Map; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.http.conn.routing.RouteInfo.LayerType; import org.eclipse.persistence.dynamic.DynamicType; import org.eclipse.persistence.internal.helper.DatabaseField; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.oxm.MediaType; import org.json.JSONException; import org.json.JSONObject; import org.onap.aai.cl.api.Logger; @@ -44,6 +46,7 @@ import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.datarouter.entity.DocumentStoreDataEntity; import org.onap.aai.datarouter.entity.OxmEntityDescriptor; import org.onap.aai.datarouter.entity.SpikeEventEntity; +import org.onap.aai.datarouter.entity.SpikeEventMeta; import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; import org.onap.aai.datarouter.util.EntityOxmReferenceHelper; @@ -51,6 +54,8 @@ import org.onap.aai.datarouter.util.ExternalOxmModelProcessor; import org.onap.aai.datarouter.util.OxmModelLoader; import org.onap.aai.datarouter.util.RouterServiceUtil; import org.onap.aai.datarouter.util.SearchServiceAgent; +import org.onap.aai.datarouter.util.Version; +import org.onap.aai.datarouter.util.VersionedOxmEntities; import org.onap.aai.restclient.client.Headers; import org.onap.aai.restclient.client.OperationResult; import org.onap.aai.restclient.rest.HttpUtil; @@ -59,30 +64,41 @@ import org.slf4j.MDC; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.ObjectWriter; public abstract class AbstractSpikeEntityEventProcessor implements Processor { protected static final String additionalInfo = "Response of SpikeEntityEventPolicy"; private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors; - - protected final String ACTION_CREATE = "create"; - private final String EVENT_VERTEX = "vertex"; - public final static String ACTION_DELETE = "delete"; - protected final String ACTION_UPDATE = "update"; protected final String PROCESS_SPIKE_EVENT = "Process Spike Event"; - private final String OPERATION_KEY = "operation"; + protected static final String UPDATE_NOTIFICATION = "update-notification"; + protected static final String SPIKE = "SPIKE"; + + protected static final String HEADER_KEY = "header"; + protected static final String EVENT_TYPE_KEY = "event-type"; + protected static final String SOURCE_NAME_KEY = "source-name"; + protected static final String BODY_KEY = "body"; + protected static final String OPERATION_KEY = "operation"; + + protected static final String VERTEX_KEY = "vertex"; + protected static final String VERTEX_PROPERTIES_KEY = "properties"; + protected static final String VERTEX_KEY_KEY = "key"; + protected static final String VERTEX_TYPE_KEY = "type"; + protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version"; + + protected static final String CREATE = "create"; + protected static final String DELETE = "delete"; + protected static final String UPDATE = "update"; + protected enum ResponseType { SUCCESS, PARTIAL_SUCCESS, FAILURE; } - private final List<String> SUPPORTED_ACTIONS = - Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE); + protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE); - Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>(); - private String oxmVersion = null; + protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>(); + protected String oxmVersion = null; /** Agent for communicating with the Search Service. */ protected SearchServiceAgent searchAgent = null; @@ -268,9 +284,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Load the UEB payload data, any errors will result in a failure and discard - JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX); + JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY); if (spikeObjVertex == null) { - returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX); + returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY); return null; } @@ -336,7 +352,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) { String entityPrimaryKeyFieldName = - getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType); + getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType); if (entityPrimaryKeyFieldName == null) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary key attribute"); @@ -346,7 +362,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { return null; } String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName); - if (entityPrimaryKeyFieldValue.isEmpty()) { + if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary value attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, @@ -425,8 +441,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Make sure that were were actually passed in a valid string. if (payload == null || payload.isEmpty()) { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); + logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY); + logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY); return eventVertex; } @@ -448,7 +464,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { } - private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String payload, + private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String oxmEntityType, String entityType) { DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType); @@ -478,9 +494,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { try { jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload)); } catch (IOException e) { - logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", payload); - logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", ""); } @@ -634,8 +650,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { String entityId = eventEntity.getId(); - if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null) - || action.equalsIgnoreCase(ACTION_UPDATE)) { + if ((action.equalsIgnoreCase(CREATE) && entityId != null) + || action.equalsIgnoreCase(UPDATE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -653,11 +669,11 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { // Write the entity to the search service. // PUT searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_CREATE)) { + } else if (action.equalsIgnoreCase(CREATE)) { // Write the entry to the search service. searchAgent.postDocument(index, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_DELETE)) { + } else if (action.equalsIgnoreCase(DELETE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -670,6 +686,13 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } + /* + * The Spring-Boot version of the search-data-service rejects the DELETE operation unless + * we specify a Content-Type. + */ + + headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType())); + searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); @@ -682,4 +705,211 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor { action); } } + + protected SpikeEventMeta processSpikeEvent(Exchange exchange) { + + SpikeEventMeta meta = new SpikeEventMeta(); + Object eventPayloadObj = null; + String eventPayload = null; + try { + eventPayloadObj = exchange.getIn().getBody(); + + /* + * It is expected that mainJson will have multiple top level objects: - header - body - result + */ + if (eventPayloadObj == null) { + returnWithError(exchange, null, "Invalid Payload"); + return null; + } + + eventPayload = (String)eventPayloadObj; + + meta.setEventEntity(new JSONObject(eventPayload)); + } catch (JSONException exc) { + returnWithError(exchange, eventPayload, "Invalid Payload"); + return null; + } + + JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY); + + if (eventHeader == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY); + return null; + } + + meta.setEventHeader(eventHeader); + + /* + * Only process SPIKE update-notification events + */ + + final String sourceName = eventHeader.getString(SOURCE_NAME_KEY); + final String eventType = eventHeader.getString(EVENT_TYPE_KEY); + + if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) { + // drop event + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='" + + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'."); + + /* + * I don't think ignoring a non-applicable event constitutes a failure. + */ + + setResponse(exchange, ResponseType.SUCCESS, additionalInfo); + return null; + } + + JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY); + + if (eventBody == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY); + return null; + } + + meta.setEventBody(eventBody); + + String action = eventBody.getString(OPERATION_KEY); + if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Unrecognized action '" + action + "'", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Unrecognized action '" + action + "'"); + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + meta.setBodyOperationType(action); + + // Load the event body data, any errors will result in a failure and discard + + JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY); + if (spikeVertex == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY); + return null; + } + + meta.setSpikeVertex(spikeVertex); + + SpikeEventVertex spikeEventVertex = null; + try { + spikeEventVertex = initializeSpikeEventVertex(spikeVertex); + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Error initializating spike event. Error: " + exc.getMessage(), eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Error initializating spike event. Error: " + exc.getMessage()); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setSpikeEventVertex(spikeEventVertex); + + DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase()); + if (oxmJaxbContext == null) { + logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion); + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", + eventPayload); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setOxmJaxbContext(oxmJaxbContext); + + String entityType = spikeEventVertex.getType(); + if (entityType == null || entityType.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing entity type", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing entity type"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + /* + * test if entityType is in the model + */ + + VersionedOxmEntities oxmEntities = + EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion)); + + if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'", + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "No matching OXM Descriptor for entity-type='" + entityType + "'"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + + String entityKey = spikeEventVertex.getKey(); + if (entityKey == null || entityKey.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key", + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload vertex missing entity key"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + String entityLink = spikeEventVertex.getEntityLink(); + if (entityLink == null || entityLink.isEmpty()) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing entity link", eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing entity link"); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + JSONObject vertexProperties = null; + try { + + vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY); + + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, + "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Payload header missing " + VERTEX_PROPERTIES_KEY); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return null; + } + + meta.setVertexProperties(vertexProperties); + + // log the fact that all data are in good shape + logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); + logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, + eventPayload); + + return meta; + + } + + protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException { + + /* + * These are all critical keys + */ + + final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY); + final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY); + final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY); + + SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey); + eventVertex.setSchemaVersion(vertexSchemaVersion); + logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); + + return eventVertex; + + } + } diff --git a/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java b/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java index 793588f..d54fbe7 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java +++ b/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java @@ -37,6 +37,7 @@ import org.apache.camel.Processor; import org.eclipse.persistence.dynamic.DynamicType; import org.eclipse.persistence.internal.helper.DatabaseField; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.oxm.MediaType; import org.json.JSONException; import org.json.JSONObject; import org.onap.aai.datarouter.entity.AaiEventEntity; @@ -1000,6 +1001,13 @@ public class EntityEventPolicy implements Processor { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } + + /* + * The Spring-Boot version of the search-data-service rejects the DELETE operation unless + * we specify a Content-Type. + */ + + headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType())); searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else { diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java index c2f7d25..b1a5a87 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java +++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java @@ -21,13 +21,11 @@ package org.onap.aai.datarouter.policy; import java.io.FileNotFoundException; -import java.io.IOException; import java.util.List; import org.apache.camel.Exchange; -import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; import org.onap.aai.datarouter.entity.SpikeAggregationEntity; -import org.onap.aai.datarouter.entity.SpikeEventVertex; +import org.onap.aai.datarouter.entity.SpikeEventMeta; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; import com.fasterxml.jackson.databind.JsonNode; @@ -37,13 +35,11 @@ public class SpikeAggregateGenericVnfProcessor extends AbstractSpikeEntityEventP public static final String additionalInfo = "Response of SpikeEntityEventPolicy"; - /** Agent for communicating with the Search Service. */ - public SpikeAggregateGenericVnfProcessor(SpikeEventPolicyConfig config) throws FileNotFoundException { super(config); } - + @Override protected void startup() { // Create the indexes in the search service if they do not already exist. @@ -55,55 +51,31 @@ public class SpikeAggregateGenericVnfProcessor extends AbstractSpikeEntityEventP public void process(Exchange exchange) throws Exception { long startTime = System.currentTimeMillis(); - String uebPayload = getExchangeBody(exchange); - if (uebPayload == null) { - return; - } - JsonNode uebAsJson = null; - try { - uebAsJson = mapper.readTree(uebPayload); - } catch (IOException e) { - returnWithError(exchange, uebPayload, "Invalid Payload"); - return; - } - - String action = getSpikeEventAction(exchange, uebPayload); - if (action == null) { - return; - } - SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload); - if (eventVertex == null) { - return; - } - String entityType = getEntityType(exchange, eventVertex, uebPayload); - if (entityType == null) { - return; - } - String entityLink = getEntityLink(exchange, eventVertex, uebPayload); - if (entityLink == null) { - return; - } - DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload); - if (oxmJaxbContext == null) { + + SpikeEventMeta meta = processSpikeEvent(exchange); + + if (meta == null) { return; } - String oxmEntityType = getOxmEntityType(entityType); - List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload, - exchange); + + String oxmEntityType = getOxmEntityType(meta.getSpikeEventVertex().getType()); + + List<String> searchableAttr = getSearchableAttibutes(meta.getOxmJaxbContext(), oxmEntityType, + meta.getSpikeEventVertex().getType(), meta.getEventEntity().toString(), exchange); + if (searchableAttr == null) { return; } - - // log the fact that all data are in good shape - logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); - logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, - uebPayload); + JsonNode propertiesNode = + mapper.readValue(meta.getVertexProperties().toString(), JsonNode.class); SpikeAggregationEntity spikeAgregationEntity = new SpikeAggregationEntity(); - spikeAgregationEntity.setLink(entityLink); - spikeAgregationEntity.deriveFields(uebAsJson); - handleSearchServiceOperation(spikeAgregationEntity, action, searchIndexName); + spikeAgregationEntity.setLink(meta.getSpikeEventVertex().getEntityLink()); + spikeAgregationEntity.deriveFields(propertiesNode); + + handleSearchServiceOperation(spikeAgregationEntity, meta.getBodyOperationType(), + searchIndexName); long stopTime = System.currentTimeMillis(); metricsLogger.info(EntityEventPolicyMsgs.OPERATION_RESULT_NO_ERRORS, PROCESS_SPIKE_EVENT, diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java index 4340eb8..c33e668 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java +++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java @@ -21,20 +21,15 @@ package org.onap.aai.datarouter.policy; import java.io.FileNotFoundException; -import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.camel.Exchange; -import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; -import org.json.JSONException; -import org.json.JSONObject; import org.onap.aai.datarouter.entity.OxmEntityDescriptor; -import org.onap.aai.datarouter.entity.SpikeEventVertex; +import org.onap.aai.datarouter.entity.SpikeEventMeta; import org.onap.aai.datarouter.entity.SuggestionSearchEntity; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; import org.onap.aai.datarouter.util.EntityOxmReferenceHelper; @@ -44,21 +39,18 @@ import org.onap.aai.datarouter.util.VersionedOxmEntities; import com.fasterxml.jackson.databind.JsonNode; - public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProcessor { public static final String additionalInfo = "Response of SpikeEntityEventPolicy"; - private final String EVENT_VERTEX = "vertex"; - - private String oxmVersion = null; + private static final String PROCESS_SPIKE_EVENT = "Process Spike Event"; + /** Agent for communicating with the Search Service. */ public SpikeAutosuggestIndexProcessor(SpikeEventPolicyConfig config) throws FileNotFoundException { super(config); - parseLatestOxmVersion(); } @Override @@ -71,76 +63,30 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc @Override public void process(Exchange exchange) throws Exception { - long startTime = System.currentTimeMillis(); - String uebPayload = getExchangeBody(exchange); - if (uebPayload == null) { - return; - } - JsonNode uebAsJson = null; - try { - uebAsJson = mapper.readTree(uebPayload); - } catch (IOException e) { - returnWithError(exchange, uebPayload, "Invalid Payload"); - return; - } + long startTime = System.currentTimeMillis(); - String action = getSpikeEventAction(exchange, uebPayload); - if (action == null) { - return; - } - JSONObject uebObjEntity = getUebContentAsJson(uebPayload, EVENT_VERTEX); - if (uebObjEntity == null) { - returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX); - return; - } + SpikeEventMeta meta = processSpikeEvent(exchange); - SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload); - if (eventVertex == null) { - return; - } - String entityType = getEntityType(exchange, eventVertex, uebPayload); - if (entityType == null) { - return; - } - String entityLink = getEntityLink(exchange, eventVertex, uebPayload); - if (entityLink == null) { - return; - } - DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload); - if (oxmJaxbContext == null) { + if ( meta == null ) { return; } - String oxmEntityType = getOxmEntityType(entityType); - List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload, - exchange); - if (searchableAttr == null) { - return; - } - - // log the fact that all data are in good shape - logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); - logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, - uebPayload); - - + /* * Use the versioned OXM Entity class to get access to cross-entity reference helper collections */ VersionedOxmEntities oxmEntities = EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion)); - /* - * Process for autosuggestable entities - */ if (oxmEntities != null) { Map<String, OxmEntityDescriptor> rootDescriptor = oxmEntities.getSuggestableEntityDescriptors(); if (!rootDescriptor.isEmpty()) { List<String> suggestibleAttrInPayload = new ArrayList<>(); - List<String> suggestibleAttrInOxm = extractSuggestableAttr(oxmEntities, entityType); + List<String> suggestibleAttrInOxm = + extractSuggestableAttr(oxmEntities, meta.getSpikeEventVertex().getType()); if (suggestibleAttrInOxm != null) { - for (String attr: suggestibleAttrInOxm){ - if ( uebAsJson.get("vertex").get("properties").has(attr) ){ + for (String attr : suggestibleAttrInOxm) { + if (meta.getVertexProperties().has(attr)) { suggestibleAttrInPayload.add(attr); } } @@ -149,24 +95,26 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc if (suggestibleAttrInPayload.isEmpty()) { return; } - List<String> suggestionAliases = extractAliasForSuggestableEntity(oxmEntities, entityType); + List<String> suggestionAliases = extractAliasForSuggestableEntity(oxmEntities, meta.getSpikeEventVertex().getType()); /* * It was decided to silently ignore DELETE requests for resources we don't allow to be * deleted. e.g. auto-suggestion deletion is not allowed while aggregation deletion is. */ - if (!ACTION_DELETE.equalsIgnoreCase(action)) { + if (!DELETE.equalsIgnoreCase(meta.getBodyOperationType())) { List<ArrayList<String>> listOfValidPowerSetElements = SearchSuggestionPermutation.getNonEmptyUniqueLists(suggestibleAttrInPayload); - + + JsonNode propertiesNode = mapper.readValue(meta.getVertexProperties().toString(), JsonNode.class); + // Now we have a list containing the power-set (minus empty element) for the status that are // available in the payload. Try inserting a document for every combination. for (ArrayList<String> list : listOfValidPowerSetElements) { SuggestionSearchEntity suggestionSearchEntity = new SuggestionSearchEntity(); - suggestionSearchEntity.setEntityType(entityType); + suggestionSearchEntity.setEntityType(meta.getSpikeEventVertex().getType()); suggestionSearchEntity.setSuggestableAttr(list); suggestionSearchEntity.setEntityTypeAliases(suggestionAliases); - suggestionSearchEntity.setFilterBasedPayloadFromResponse(uebAsJson.get("vertex").get("properties"), + suggestionSearchEntity.setFilterBasedPayloadFromResponse(propertiesNode, suggestibleAttrInOxm, list); suggestionSearchEntity.setSuggestionInputPermutations( suggestionSearchEntity.generateSuggestionInputPermutations()); @@ -180,7 +128,7 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc + e.getLocalizedMessage()); } - handleSearchServiceOperation(suggestionSearchEntity, action, searchIndexName); + handleSearchServiceOperation(suggestionSearchEntity, meta.getBodyOperationType(), searchIndexName); } } } @@ -192,7 +140,7 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc setResponse(exchange, ResponseType.SUCCESS, additionalInfo); return; } - + public List<String> extractSuggestableAttr(VersionedOxmEntities oxmEntities, String entityType) { // Extract suggestable attributeshandleTopographicalData Map<String, OxmEntityDescriptor> rootDescriptor = oxmEntities.getSuggestableEntityDescriptors(); @@ -225,28 +173,4 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc return desc.getAlias(); } - private void parseLatestOxmVersion() { - int latestVersion = -1; - if (oxmVersionContextMap != null) { - Iterator it = oxmVersionContextMap.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry pair = (Map.Entry) it.next(); - - String version = pair.getKey().toString(); - int versionNum = Integer.parseInt(version.substring(1, version.length())); - - if (versionNum > latestVersion) { - latestVersion = versionNum; - oxmVersion = pair.getKey().toString(); - } - - logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString()); - } - } else { - logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, ""); - } - } - - - } diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java index 36bb142..caee8b4 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java +++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java @@ -30,21 +30,23 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.eclipse.persistence.dynamic.DynamicType; import org.eclipse.persistence.internal.helper.DatabaseField; import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.oxm.MediaType; import org.json.JSONException; import org.json.JSONObject; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; -import org.onap.aai.datarouter.entity.SpikeEventEntity; import org.onap.aai.datarouter.entity.DocumentStoreDataEntity; -import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.entity.OxmEntityDescriptor; +import org.onap.aai.datarouter.entity.SpikeEventEntity; +import org.onap.aai.datarouter.entity.SpikeEventVertex; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; import org.onap.aai.datarouter.util.EntityOxmReferenceHelper; import org.onap.aai.datarouter.util.ExternalOxmModelProcessor; @@ -68,17 +70,38 @@ public class SpikeEntityEventPolicy implements Processor { private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors; - - private static final String ACTION_CREATE = "create"; - private static final String EVENT_VERTEX = "vertex"; - private static final String ACTION_DELETE = "delete"; - private static final String ACTION_UPDATE = "update"; + /** + * Note (8-June-2018): + * + * At present we don't need to support every event-type that could be present in the spike-events. + * The only one we want is a SPIKE "update-notification". In the future perhaps we need to add some + * configurability to the camel-route itself with a json camel filtering component so that routing + * logic can be modified as part of the camel route spring-xml instead of hard-coding the + * event filtering in here. + */ + private static final String PROCESS_SPIKE_EVENT = "Process Spike Event"; + + private static final String UPDATE_NOTIFICATION = "update-notification"; + private static final String SPIKE = "SPIKE"; + + private static final String HEADER_KEY = "header"; + private static final String EVENT_TYPE_KEY = "event-type"; + private static final String SOURCE_NAME_KEY = "source-name"; + private static final String BODY_KEY = "body"; private static final String OPERATION_KEY = "operation"; + private static final String VERTEX_KEY = "vertex"; + private static final String VERTEX_KEY_KEY = "key"; + private static final String VERTEX_TYPE_KEY = "type"; + private static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version"; + + private static final String CREATE = "create"; + private static final String DELETE = "delete"; + private static final String UPDATE = "update"; private static final List<String> SUPPORTED_ACTIONS = - Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE); + Arrays.asList(CREATE, UPDATE, DELETE); Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>(); private String oxmVersion = null; @@ -122,7 +145,7 @@ public class SpikeEntityEventPolicy implements Processor { private void parseLatestOxmVersion() { int latestVersion = -1; if (oxmVersionContextMap != null) { - Iterator it = oxmVersionContextMap.entrySet().iterator(); + Iterator<Entry<String, DynamicJAXBContext>> it = oxmVersionContextMap.entrySet().iterator(); while (it.hasNext()) { Map.Entry pair = (Map.Entry) it.next(); @@ -148,6 +171,7 @@ public class SpikeEntityEventPolicy implements Processor { logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED); } + /** * Convert object to json. @@ -177,65 +201,109 @@ public class SpikeEntityEventPolicy implements Processor { setResponse(exchange, ResponseType.FAILURE, additionalInfo); } - public boolean isJSONValid(String test) { + @Override + public void process(Exchange exchange) /*throws Exception*/ { + + long startTime = System.currentTimeMillis(); + + final String eventPayload = exchange.getIn().getBody().toString(); + JSONObject mainJson = null; + try { - new JSONObject(test); - } catch (JSONException ex) { - return false; + + /* + * It is expected that mainJson will have multiple top level objects: + * - header + * - body + * - result + */ + + mainJson = new JSONObject(eventPayload); + } catch (JSONException exc) { + returnWithError(exchange, eventPayload, "Invalid Payload"); + return; } - return true; - } + + JSONObject eventHeader = mainJson.getJSONObject(HEADER_KEY); - @Override - public void process(Exchange exchange) throws Exception { + if (eventHeader == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY); + return; + } + + /* + * Only process SPIKE update-notification events + */ + + final String sourceName = eventHeader.getString(SOURCE_NAME_KEY); + final String eventType = eventHeader.getString(EVENT_TYPE_KEY); - long startTime = System.currentTimeMillis(); - String uebPayload = exchange.getIn().getBody().toString(); - if (uebPayload == null || !isJSONValid(uebPayload)) { - uebPayload = exchange.getIn().getBody(String.class); - if (uebPayload == null || !isJSONValid(uebPayload)) { - returnWithError(exchange, uebPayload, "Invalid Payload"); - return; - } + if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) { + // drop event + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='" + + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'."); + + /* + * I don't think ignoring a non-applicable event constitutes a failure. + */ + + setResponse(exchange, ResponseType.SUCCESS, additionalInfo); + return; } + + JSONObject eventBody = mainJson.getJSONObject(BODY_KEY); + if (eventBody == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY); + return; + } - JSONObject mainJson = new JSONObject(uebPayload); - String action = mainJson.getString(OPERATION_KEY); + String action = eventBody.getString(OPERATION_KEY); if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Unrecognized action '" + action + "'", uebPayload); + "Unrecognized action '" + action + "'", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Unrecognized action '" + action + "'"); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - // Load the UEB payload data, any errors will result in a failure and discard - - JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX); - if (spikeObjVertex == null) { - returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX); + // Load the event body data, any errors will result in a failure and discard + + JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY); + if (spikeVertex == null) { + returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY); return; } - SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString()); + SpikeEventVertex eventVertex = null; + try { + eventVertex = initializeSpikeEventVertex(spikeVertex); + } catch (JSONException exc) { + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Error initializating spike event. Error: " + exc.getMessage(), + eventPayload); + logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, + "Error initializating spike event. Error: " + exc.getMessage()); + + setResponse(exchange, ResponseType.FAILURE, additionalInfo); + return; + } DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase()); if (oxmJaxbContext == null) { logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion); - logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload); + logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - - String entityType = eventVertex.getType(); if (entityType == null || entityType.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload header missing entity type", uebPayload); + "Payload header missing entity type", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload header missing entity type"); @@ -246,7 +314,7 @@ public class SpikeEntityEventPolicy implements Processor { String entityKey = eventVertex.getKey(); if (entityKey == null || entityKey.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key", - uebPayload); + eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload vertex missing entity key"); @@ -256,7 +324,7 @@ public class SpikeEntityEventPolicy implements Processor { String entityLink = eventVertex.getEntityLink(); if (entityLink == null || entityLink.isEmpty()) { logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload header missing entity link", uebPayload); + "Payload header missing entity link", eventPayload); logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload header missing entity link"); @@ -267,8 +335,7 @@ public class SpikeEntityEventPolicy implements Processor { // log the fact that all data are in good shape logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, - uebPayload); - + eventPayload); // Process for building SpikeEventEntity object String[] entityTypeArr = entityType.split("-"); @@ -276,7 +343,7 @@ public class SpikeEntityEventPolicy implements Processor { for (String entityWord : entityTypeArr) { oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1); } - + List<String> searchableAttr = getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable"); if (searchableAttr == null) { @@ -284,28 +351,28 @@ public class SpikeEntityEventPolicy implements Processor { "Searchable attribute not found for payload entity type '" + entityType + "'"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Searchable attribute not found for payload entity type '" + entityType + "'", - uebPayload); + eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } String entityPrimaryKeyFieldName = - getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType); + getEntityPrimaryKeyFieldName(oxmJaxbContext, eventPayload, oxmEntityType, entityType); if (entityPrimaryKeyFieldName == null) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary key attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing primary key attribute", uebPayload); + "Payload missing primary key attribute", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; } - String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName); - if (entityPrimaryKeyFieldValue.isEmpty()) { + String entityPrimaryKeyFieldValue = lookupValueUsingKey(eventPayload, entityPrimaryKeyFieldName); + if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing primary value attribute"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing primary value attribute", uebPayload); + "Payload missing primary value attribute", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -321,12 +388,14 @@ public class SpikeEntityEventPolicy implements Processor { spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldValue); spikeEventEntity.setEntityType(entityType); spikeEventEntity.setLink(entityLink); + + System.out.println(spikeEventEntity); - if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) { + if (!getSearchTags(spikeEventEntity, searchableAttr, eventPayload, action)) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE, "Payload missing searchable attribute for entity type '" + entityType + "'"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, - "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload); + "Payload missing searchable attribute for entity type '" + entityType + "'", eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -338,7 +407,7 @@ public class SpikeEntityEventPolicy implements Processor { } catch (NoSuchAlgorithmException e) { logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest"); logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest", - uebPayload); + eventPayload); setResponse(exchange, ResponseType.FAILURE, additionalInfo); return; @@ -363,60 +432,19 @@ public class SpikeEntityEventPolicy implements Processor { exchange.getOut().setBody(additionalInfo); } + private SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException { - /* - * Load the UEB JSON payload, any errors would result to a failure case response. - */ - private JSONObject getUebContentAsJson(String payload, String contentKey) { - - JSONObject uebJsonObj; - JSONObject uebObjContent; - - try { - uebJsonObj = new JSONObject(payload); - } catch (JSONException e) { - logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload); - logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload); - return null; - } - - if (uebJsonObj.has(contentKey)) { - uebObjContent = uebJsonObj.getJSONObject(contentKey); - } else { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey); - return null; - } - - return uebObjContent; - } - - - private SpikeEventVertex initializeSpikeEventVertex(String payload) { - - SpikeEventVertex eventVertex = null; - ObjectMapper mapper = new ObjectMapper(); + /* + * These are all critical keys + */ - // Make sure that were were actually passed in a valid string. - if (payload == null || payload.isEmpty()) { - logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); - logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX); + final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY); + final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY); + final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY); - return eventVertex; - } - - // Marshal the supplied string into a UebEventHeader object. - try { - eventVertex = mapper.readValue(payload, SpikeEventVertex.class); - } catch (JsonProcessingException e) { - logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString()); - } catch (Exception e) { - logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString()); - } - - if (eventVertex != null) { - logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); - } + SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey); + eventVertex.setSchemaVersion(vertexSchemaVersion); + logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString()); return eventVertex; @@ -453,9 +481,9 @@ public class SpikeEntityEventPolicy implements Processor { try { jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload)); } catch (IOException e) { - logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", payload); - logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing", + logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing", ""); } @@ -584,8 +612,8 @@ public class SpikeEntityEventPolicy implements Processor { String entityId = eventEntity.getId(); - if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null) - || action.equalsIgnoreCase(ACTION_UPDATE)) { + if ((action.equalsIgnoreCase(CREATE) && entityId != null) + || action.equalsIgnoreCase(UPDATE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -603,11 +631,11 @@ public class SpikeEntityEventPolicy implements Processor { // Write the entity to the search service. // PUT searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_CREATE)) { + } else if (action.equalsIgnoreCase(CREATE)) { // Write the entry to the search service. searchAgent.postDocument(index, eventEntity.getAsJson(), headers); - } else if (action.equalsIgnoreCase(ACTION_DELETE)) { + } else if (action.equalsIgnoreCase(DELETE)) { // Run the GET to retrieve the ETAG from the search service OperationResult storedEntity = searchAgent.getDocument(index, entityId); @@ -619,6 +647,13 @@ public class SpikeEntityEventPolicy implements Processor { } else { logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId); } + + /* + * The Spring-Boot version of the search-data-service rejects the DELETE operation unless + * we specify a Content-Type. + */ + + headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType())); searchAgent.deleteDocument(index, eventEntity.getId(), headers); } else { diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java index e9c6b06..8b43b1b 100644 --- a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java +++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java @@ -24,23 +24,20 @@ import java.io.FileNotFoundException; import java.util.List; import org.apache.camel.Exchange; -import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; import org.onap.aai.datarouter.entity.SpikeEventEntity; -import org.onap.aai.datarouter.entity.SpikeEventVertex; +import org.onap.aai.datarouter.entity.SpikeEventMeta; import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs; public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcessor { public static final String additionalInfo = "Response of SpikeEntityEventPolicy"; - private static final String searchIndexSchema = ""; - + private static final String PROCESS_SPIKE_EVENT = "Process Spike Event"; /** Agent for communicating with the Search Service. */ - public SpikeEntitySearchProcessor(SpikeEventPolicyConfig config) - throws FileNotFoundException { + public SpikeEntitySearchProcessor(SpikeEventPolicyConfig config) throws FileNotFoundException { super(config); } @@ -56,53 +53,33 @@ public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcesso public void process(Exchange exchange) throws Exception { long startTime = System.currentTimeMillis(); - String uebPayload = getExchangeBody(exchange); - if (uebPayload == null) { - return; - } - String action = getSpikeEventAction(exchange, uebPayload); - if (action == null) { - return; - } - SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload); - if (eventVertex == null) { - return; - } - String entityType = getEntityType(exchange, eventVertex, uebPayload); - if (entityType == null) { - return; - } - String entityLink = getEntityLink(exchange, eventVertex, uebPayload); - if (entityLink == null) { - return; - } - DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload); - if (oxmJaxbContext == null) { + SpikeEventMeta meta = processSpikeEvent(exchange); + + if (meta == null) { return; } - String oxmEntityType = getOxmEntityType(entityType); - List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload, - exchange); + + String oxmEntityType = getOxmEntityType(meta.getSpikeEventVertex().getType()); + List<String> searchableAttr = getSearchableAttibutes(meta.getOxmJaxbContext(), oxmEntityType, + meta.getSpikeEventVertex().getType(), meta.getEventEntity().toString(), exchange); if (searchableAttr == null) { return; } - // log the fact that all data are in good shape - logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType); - logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType, - uebPayload); SpikeEventEntity spikeEventEntity = new SpikeEventEntity(); - spikeEventEntity.setEntityType(entityType); - spikeEventEntity.setLink(entityLink); - spikeEventEntity = populateSpikeEventEntity(exchange, spikeEventEntity, oxmJaxbContext, - entityType, action, uebPayload, oxmEntityType,searchableAttr); + spikeEventEntity.setEntityType(meta.getSpikeEventVertex().getType()); + spikeEventEntity.setLink(meta.getSpikeEventVertex().getEntityLink()); + spikeEventEntity = populateSpikeEventEntity(exchange, spikeEventEntity, + meta.getOxmJaxbContext(), meta.getSpikeEventVertex().getType(), meta.getBodyOperationType(), + meta.getVertexProperties().toString(), oxmEntityType, searchableAttr); + if (spikeEventEntity == null) { return; } - handleSearchServiceOperation(spikeEventEntity, action, searchIndexName); + handleSearchServiceOperation(spikeEventEntity, meta.getBodyOperationType(), searchIndexName); long stopTime = System.currentTimeMillis(); metricsLogger.info(EntityEventPolicyMsgs.OPERATION_RESULT_NO_ERRORS, PROCESS_SPIKE_EVENT, String.valueOf(stopTime - startTime)); @@ -110,6 +87,7 @@ public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcesso return; } + /* * This is not for this Scope. We get back to it later. (updateCerInEntity) private void * updateSearchEntityWithCrossEntityReference(SpikeEventEntity spikeEventEntity) { try { diff --git a/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java b/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java index 06e1ab5..fc34437 100644 --- a/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java +++ b/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java @@ -341,5 +341,9 @@ public class VersionedOxmEntities { return sb.toString(); } + + public Map<String, DynamicType> getEntityTypeLookup() { + return entityTypeLookup; + } } |