aboutsummaryrefslogtreecommitdiffstats
path: root/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'src/main')
-rw-r--r--src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java26
-rw-r--r--src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java80
-rw-r--r--src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java104
-rw-r--r--src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java28
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java278
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java8
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java66
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java116
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java253
-rw-r--r--src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java58
-rw-r--r--src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java4
11 files changed, 679 insertions, 342 deletions
diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java
index 5aff3ce..bb52802 100644
--- a/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java
+++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeAggregationEntity.java
@@ -28,6 +28,7 @@ import java.util.Map.Entry;
import org.onap.aai.datarouter.util.NodeUtils;
+import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -36,6 +37,13 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
* The Class SpikeAggregationEntity. Mimics functionality of SPIKEUI's AggregationEntity
*/
public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializable {
+
+ private Map<String, String> attributes = new HashMap<>();
+
+ @JsonIgnore
+ private ObjectMapper mapper = new ObjectMapper();
+
+
private String id;
private String link;
private String lastmodTimestamp;
@@ -68,28 +76,23 @@ public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializ
}
- Map<String, String> attributes = new HashMap<>();
- ObjectMapper mapper = new ObjectMapper();
/**
* Instantiates a new aggregation entity.
*/
public SpikeAggregationEntity() {}
- public void deriveFields(JsonNode uebPayload) {
+ public void deriveFields(JsonNode entityProperties) {
this.setId(NodeUtils.generateUniqueShaDigest(link));
this.setLastmodTimestamp(Long.toString(System.currentTimeMillis()));
- JsonNode entityNode = uebPayload.get("vertex").get("properties");
- Iterator<Entry<String, JsonNode>> nodes = entityNode.fields();
+ Iterator<Entry<String, JsonNode>> nodes = entityProperties.fields();
while (nodes.hasNext()) {
Map.Entry<String, JsonNode> entry = (Map.Entry<String, JsonNode>) nodes.next();
attributes.put(entry.getKey(), entry.getValue().asText());
}
}
-
-
@Override
public String getAsJson() {
ObjectNode rootNode = mapper.createObjectNode();
@@ -105,7 +108,12 @@ public class SpikeAggregationEntity implements DocumentStoreDataEntity, Serializ
@Override
public String toString() {
- return "AggregationEntity [id=" + id + ", link=" + link + ", attributes=" + attributes
- + ", mapper=" + mapper + "]";
+ return "SpikeAggregationEntity ["
+ + (attributes != null ? "attributes=" + attributes + ", " : "")
+ + (id != null ? "id=" + id + ", " : "") + (link != null ? "link=" + link + ", " : "")
+ + (lastmodTimestamp != null ? "lastmodTimestamp=" + lastmodTimestamp : "") + "]";
}
+
+
+
}
diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java
new file mode 100644
index 0000000..476b54e
--- /dev/null
+++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventHeader.java
@@ -0,0 +1,80 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.datarouter.entity;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SpikeEventHeader {
+
+ @JsonProperty("request-id")
+ private String requestId;
+
+ private String timestamp;
+
+ @JsonProperty("source-name")
+ private String sourceName;
+
+ @JsonProperty("event-type")
+ private String eventType;
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public void setRequestId(String requestId) {
+ this.requestId = requestId;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(String timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public void setEventType(String eventType) {
+ this.eventType = eventType;
+ }
+
+ @Override
+ public String toString() {
+ return "SpikeEventHeader [" + (requestId != null ? "requestId=" + requestId + ", " : "")
+ + (timestamp != null ? "timestamp=" + timestamp + ", " : "")
+ + (sourceName != null ? "sourceName=" + sourceName + ", " : "")
+ + (eventType != null ? "eventType=" + eventType : "") + "]";
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java
new file mode 100644
index 0000000..40c4acc
--- /dev/null
+++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventMeta.java
@@ -0,0 +1,104 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.datarouter.entity;
+
+import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.json.JSONObject;
+
+public class SpikeEventMeta {
+
+ private JSONObject eventEntity;
+ private JSONObject eventHeader;
+ private JSONObject eventBody;
+ private JSONObject spikeVertex;
+ private JSONObject vertexProperties;
+ private SpikeEventVertex spikeEventVertex;
+ private DynamicJAXBContext oxmJaxbContext;
+ private String bodyOperationType;
+
+ public JSONObject getEventEntity() {
+ return eventEntity;
+ }
+
+ public void setEventEntity(JSONObject eventEntity) {
+ this.eventEntity = eventEntity;
+ }
+
+ public JSONObject getEventHeader() {
+ return eventHeader;
+ }
+
+ public void setEventHeader(JSONObject eventHeader) {
+ this.eventHeader = eventHeader;
+ }
+
+ public JSONObject getEventBody() {
+ return eventBody;
+ }
+
+ public void setEventBody(JSONObject eventBody) {
+ this.eventBody = eventBody;
+ }
+
+ public JSONObject getSpikeVertex() {
+ return spikeVertex;
+ }
+
+ public void setSpikeVertex(JSONObject spikeVertex) {
+ this.spikeVertex = spikeVertex;
+ }
+
+ public JSONObject getVertexProperties() {
+ return vertexProperties;
+ }
+
+ public void setVertexProperties(JSONObject vertexProperties) {
+ this.vertexProperties = vertexProperties;
+ }
+
+ public SpikeEventVertex getSpikeEventVertex() {
+ return spikeEventVertex;
+ }
+
+ public void setSpikeEventVertex(SpikeEventVertex spikeEventVertex) {
+ this.spikeEventVertex = spikeEventVertex;
+ }
+
+ public DynamicJAXBContext getOxmJaxbContext() {
+ return oxmJaxbContext;
+ }
+
+ public void setOxmJaxbContext(DynamicJAXBContext oxmJaxbContext) {
+ this.oxmJaxbContext = oxmJaxbContext;
+ }
+
+ public String getBodyOperationType() {
+ return bodyOperationType;
+ }
+
+ public void setBodyOperationType(String bodyOperationType) {
+ this.bodyOperationType = bodyOperationType;
+ }
+
+
+
+
+}
diff --git a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java
index 43b3816..2350f93 100644
--- a/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java
+++ b/src/main/java/org/onap/aai/datarouter/entity/SpikeEventVertex.java
@@ -20,31 +20,29 @@
*/
package org.onap.aai.datarouter.entity;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-
/**
* A convenience POJO for mapping the Vertex from a Spike Event.
*
- * @author salmaA
*/
-@JsonIgnoreProperties(ignoreUnknown = true)
public class SpikeEventVertex {
- private String key;
+ private final String key;
private String schemaVersion;
- private String type;
+ private final String type;
+ private final String entityLink;
-
- public String getKey() {
- return key;
+ public SpikeEventVertex(String type, String key) {
+ this.type = type;
+ this.key = key;
+ this.entityLink = type + "/" + key;
}
- public void setKey(String key) {
- this.key = key;
+ public String getKey() {
+ return key;
}
public String getSchemaVersion() {
@@ -59,12 +57,8 @@ public class SpikeEventVertex {
return type;
}
- public void setType(String type) {
- this.type = type;
- }
-
public String getEntityLink() {
- return this.type + "/" + this.key;
+ return entityLink;
}
-
+
}
diff --git a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
index 9627365..6d04ed6 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/AbstractSpikeEntityEventProcessor.java
@@ -33,9 +33,11 @@ import java.util.Map;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.http.conn.routing.RouteInfo.LayerType;
import org.eclipse.persistence.dynamic.DynamicType;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
import org.json.JSONException;
import org.json.JSONObject;
import org.onap.aai.cl.api.Logger;
@@ -44,6 +46,7 @@ import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
import org.onap.aai.datarouter.entity.SpikeEventEntity;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
import org.onap.aai.datarouter.entity.SpikeEventVertex;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
@@ -51,6 +54,8 @@ import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
import org.onap.aai.datarouter.util.OxmModelLoader;
import org.onap.aai.datarouter.util.RouterServiceUtil;
import org.onap.aai.datarouter.util.SearchServiceAgent;
+import org.onap.aai.datarouter.util.Version;
+import org.onap.aai.datarouter.util.VersionedOxmEntities;
import org.onap.aai.restclient.client.Headers;
import org.onap.aai.restclient.client.OperationResult;
import org.onap.aai.restclient.rest.HttpUtil;
@@ -59,30 +64,41 @@ import org.slf4j.MDC;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.ObjectWriter;
public abstract class AbstractSpikeEntityEventProcessor implements Processor {
protected static final String additionalInfo = "Response of SpikeEntityEventPolicy";
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
-
- protected final String ACTION_CREATE = "create";
- private final String EVENT_VERTEX = "vertex";
- public final static String ACTION_DELETE = "delete";
- protected final String ACTION_UPDATE = "update";
protected final String PROCESS_SPIKE_EVENT = "Process Spike Event";
- private final String OPERATION_KEY = "operation";
+ protected static final String UPDATE_NOTIFICATION = "update-notification";
+ protected static final String SPIKE = "SPIKE";
+
+ protected static final String HEADER_KEY = "header";
+ protected static final String EVENT_TYPE_KEY = "event-type";
+ protected static final String SOURCE_NAME_KEY = "source-name";
+ protected static final String BODY_KEY = "body";
+ protected static final String OPERATION_KEY = "operation";
+
+ protected static final String VERTEX_KEY = "vertex";
+ protected static final String VERTEX_PROPERTIES_KEY = "properties";
+ protected static final String VERTEX_KEY_KEY = "key";
+ protected static final String VERTEX_TYPE_KEY = "type";
+ protected static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+
+ protected static final String CREATE = "create";
+ protected static final String DELETE = "delete";
+ protected static final String UPDATE = "update";
+
protected enum ResponseType {
SUCCESS, PARTIAL_SUCCESS, FAILURE;
}
- private final List<String> SUPPORTED_ACTIONS =
- Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+ protected final List<String> SUPPORTED_ACTIONS = Arrays.asList(CREATE, UPDATE, DELETE);
- Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
- private String oxmVersion = null;
+ protected Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
+ protected String oxmVersion = null;
/** Agent for communicating with the Search Service. */
protected SearchServiceAgent searchAgent = null;
@@ -268,9 +284,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Load the UEB payload data, any errors will result in a failure and discard
- JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
+ JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, VERTEX_KEY);
if (spikeObjVertex == null) {
- returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+ returnWithError(exchange, uebPayload, "Payload is missing " + VERTEX_KEY);
return null;
}
@@ -336,7 +352,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
String action, String uebPayload, String oxmEntityType, List<String> searchableAttr) {
String entityPrimaryKeyFieldName =
- getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+ getEntityPrimaryKeyFieldName(oxmJaxbContext, oxmEntityType, entityType);
if (entityPrimaryKeyFieldName == null) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary key attribute");
@@ -346,7 +362,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
return null;
}
String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
- if (entityPrimaryKeyFieldValue.isEmpty()) {
+ if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary value attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
@@ -425,8 +441,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Make sure that were were actually passed in a valid string.
if (payload == null || payload.isEmpty()) {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+ logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
+ logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, VERTEX_KEY);
return eventVertex;
}
@@ -448,7 +464,7 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
}
- private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext, String payload,
+ private String getEntityPrimaryKeyFieldName(DynamicJAXBContext oxmJaxbContext,
String oxmEntityType, String entityType) {
DynamicType entity = oxmJaxbContext.getDynamicType(oxmEntityType);
@@ -478,9 +494,9 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
try {
jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
} catch (IOException e) {
- logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
payload);
- logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
"");
}
@@ -634,8 +650,8 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
String entityId = eventEntity.getId();
- if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
- || action.equalsIgnoreCase(ACTION_UPDATE)) {
+ if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+ || action.equalsIgnoreCase(UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -653,11 +669,11 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
// Write the entity to the search service.
// PUT
searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+ } else if (action.equalsIgnoreCase(CREATE)) {
// Write the entry to the search service.
searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+ } else if (action.equalsIgnoreCase(DELETE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -670,6 +686,13 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
}
+ /*
+ * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+ * we specify a Content-Type.
+ */
+
+ headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
+
searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
@@ -682,4 +705,211 @@ public abstract class AbstractSpikeEntityEventProcessor implements Processor {
action);
}
}
+
+ protected SpikeEventMeta processSpikeEvent(Exchange exchange) {
+
+ SpikeEventMeta meta = new SpikeEventMeta();
+ Object eventPayloadObj = null;
+ String eventPayload = null;
+ try {
+ eventPayloadObj = exchange.getIn().getBody();
+
+ /*
+ * It is expected that mainJson will have multiple top level objects: - header - body - result
+ */
+ if (eventPayloadObj == null) {
+ returnWithError(exchange, null, "Invalid Payload");
+ return null;
+ }
+
+ eventPayload = (String)eventPayloadObj;
+
+ meta.setEventEntity(new JSONObject(eventPayload));
+ } catch (JSONException exc) {
+ returnWithError(exchange, eventPayload, "Invalid Payload");
+ return null;
+ }
+
+ JSONObject eventHeader = meta.getEventEntity().getJSONObject(HEADER_KEY);
+
+ if (eventHeader == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+ return null;
+ }
+
+ meta.setEventHeader(eventHeader);
+
+ /*
+ * Only process SPIKE update-notification events
+ */
+
+ final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+ final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
+
+ if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+ // drop event
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+ + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+ /*
+ * I don't think ignoring a non-applicable event constitutes a failure.
+ */
+
+ setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+ return null;
+ }
+
+ JSONObject eventBody = meta.getEventEntity().getJSONObject(BODY_KEY);
+
+ if (eventBody == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+ return null;
+ }
+
+ meta.setEventBody(eventBody);
+
+ String action = eventBody.getString(OPERATION_KEY);
+ if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Unrecognized action '" + action + "'", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Unrecognized action '" + action + "'");
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ meta.setBodyOperationType(action);
+
+ // Load the event body data, any errors will result in a failure and discard
+
+ JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+ if (spikeVertex == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
+ return null;
+ }
+
+ meta.setSpikeVertex(spikeVertex);
+
+ SpikeEventVertex spikeEventVertex = null;
+ try {
+ spikeEventVertex = initializeSpikeEventVertex(spikeVertex);
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage(), eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage());
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setSpikeEventVertex(spikeEventVertex);
+
+ DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
+ if (oxmJaxbContext == null) {
+ logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch",
+ eventPayload);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setOxmJaxbContext(oxmJaxbContext);
+
+ String entityType = spikeEventVertex.getType();
+ if (entityType == null || entityType.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity type", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity type");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ /*
+ * test if entityType is in the model
+ */
+
+ VersionedOxmEntities oxmEntities =
+ EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion));
+
+ if (oxmEntities != null && !oxmEntities.getEntityTypeLookup().containsKey(entityType)) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "No matching OXM Descriptor for entity-type='" + entityType + "'",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "No matching OXM Descriptor for entity-type='" + entityType + "'");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+
+ String entityKey = spikeEventVertex.getKey();
+ if (entityKey == null || entityKey.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload vertex missing entity key");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+ String entityLink = spikeEventVertex.getEntityLink();
+ if (entityLink == null || entityLink.isEmpty()) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing entity link", eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing entity link");
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ JSONObject vertexProperties = null;
+ try {
+
+ vertexProperties = spikeVertex.getJSONObject(VERTEX_PROPERTIES_KEY);
+
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY, eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Payload header missing " + VERTEX_PROPERTIES_KEY);
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return null;
+ }
+
+ meta.setVertexProperties(vertexProperties);
+
+ // log the fact that all data are in good shape
+ logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
+ logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
+ eventPayload);
+
+ return meta;
+
+ }
+
+ protected SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
+
+ /*
+ * These are all critical keys
+ */
+
+ final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+ final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+ final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
+
+ SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+ eventVertex.setSchemaVersion(vertexSchemaVersion);
+ logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
+
+ return eventVertex;
+
+ }
+
}
diff --git a/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java b/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java
index 793588f..d54fbe7 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/EntityEventPolicy.java
@@ -37,6 +37,7 @@ import org.apache.camel.Processor;
import org.eclipse.persistence.dynamic.DynamicType;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
import org.json.JSONException;
import org.json.JSONObject;
import org.onap.aai.datarouter.entity.AaiEventEntity;
@@ -1000,6 +1001,13 @@ public class EntityEventPolicy implements Processor {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index,
entityId);
}
+
+ /*
+ * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+ * we specify a Content-Type.
+ */
+
+ headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java
index c2f7d25..b1a5a87 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeAggregateGenericVnfProcessor.java
@@ -21,13 +21,11 @@
package org.onap.aai.datarouter.policy;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.util.List;
import org.apache.camel.Exchange;
-import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
import org.onap.aai.datarouter.entity.SpikeAggregationEntity;
-import org.onap.aai.datarouter.entity.SpikeEventVertex;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
import com.fasterxml.jackson.databind.JsonNode;
@@ -37,13 +35,11 @@ public class SpikeAggregateGenericVnfProcessor extends AbstractSpikeEntityEventP
public static final String additionalInfo = "Response of SpikeEntityEventPolicy";
- /** Agent for communicating with the Search Service. */
-
public SpikeAggregateGenericVnfProcessor(SpikeEventPolicyConfig config)
throws FileNotFoundException {
super(config);
}
-
+
@Override
protected void startup() {
// Create the indexes in the search service if they do not already exist.
@@ -55,55 +51,31 @@ public class SpikeAggregateGenericVnfProcessor extends AbstractSpikeEntityEventP
public void process(Exchange exchange) throws Exception {
long startTime = System.currentTimeMillis();
- String uebPayload = getExchangeBody(exchange);
- if (uebPayload == null) {
- return;
- }
- JsonNode uebAsJson = null;
- try {
- uebAsJson = mapper.readTree(uebPayload);
- } catch (IOException e) {
- returnWithError(exchange, uebPayload, "Invalid Payload");
- return;
- }
-
- String action = getSpikeEventAction(exchange, uebPayload);
- if (action == null) {
- return;
- }
- SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload);
- if (eventVertex == null) {
- return;
- }
- String entityType = getEntityType(exchange, eventVertex, uebPayload);
- if (entityType == null) {
- return;
- }
- String entityLink = getEntityLink(exchange, eventVertex, uebPayload);
- if (entityLink == null) {
- return;
- }
- DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload);
- if (oxmJaxbContext == null) {
+
+ SpikeEventMeta meta = processSpikeEvent(exchange);
+
+ if (meta == null) {
return;
}
- String oxmEntityType = getOxmEntityType(entityType);
- List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload,
- exchange);
+
+ String oxmEntityType = getOxmEntityType(meta.getSpikeEventVertex().getType());
+
+ List<String> searchableAttr = getSearchableAttibutes(meta.getOxmJaxbContext(), oxmEntityType,
+ meta.getSpikeEventVertex().getType(), meta.getEventEntity().toString(), exchange);
+
if (searchableAttr == null) {
return;
}
-
- // log the fact that all data are in good shape
- logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
- logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
- uebPayload);
+ JsonNode propertiesNode =
+ mapper.readValue(meta.getVertexProperties().toString(), JsonNode.class);
SpikeAggregationEntity spikeAgregationEntity = new SpikeAggregationEntity();
- spikeAgregationEntity.setLink(entityLink);
- spikeAgregationEntity.deriveFields(uebAsJson);
- handleSearchServiceOperation(spikeAgregationEntity, action, searchIndexName);
+ spikeAgregationEntity.setLink(meta.getSpikeEventVertex().getEntityLink());
+ spikeAgregationEntity.deriveFields(propertiesNode);
+
+ handleSearchServiceOperation(spikeAgregationEntity, meta.getBodyOperationType(),
+ searchIndexName);
long stopTime = System.currentTimeMillis();
metricsLogger.info(EntityEventPolicyMsgs.OPERATION_RESULT_NO_ERRORS, PROCESS_SPIKE_EVENT,
diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java
index 4340eb8..c33e668 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeAutosuggestIndexProcessor.java
@@ -21,20 +21,15 @@
package org.onap.aai.datarouter.policy;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.camel.Exchange;
-import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
-import org.json.JSONException;
-import org.json.JSONObject;
import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
-import org.onap.aai.datarouter.entity.SpikeEventVertex;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
import org.onap.aai.datarouter.entity.SuggestionSearchEntity;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
@@ -44,21 +39,18 @@ import org.onap.aai.datarouter.util.VersionedOxmEntities;
import com.fasterxml.jackson.databind.JsonNode;
-
public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProcessor {
public static final String additionalInfo = "Response of SpikeEntityEventPolicy";
- private final String EVENT_VERTEX = "vertex";
-
- private String oxmVersion = null;
+ private static final String PROCESS_SPIKE_EVENT = "Process Spike Event";
+
/** Agent for communicating with the Search Service. */
public SpikeAutosuggestIndexProcessor(SpikeEventPolicyConfig config)
throws FileNotFoundException {
super(config);
- parseLatestOxmVersion();
}
@Override
@@ -71,76 +63,30 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc
@Override
public void process(Exchange exchange) throws Exception {
- long startTime = System.currentTimeMillis();
- String uebPayload = getExchangeBody(exchange);
- if (uebPayload == null) {
- return;
- }
- JsonNode uebAsJson = null;
- try {
- uebAsJson = mapper.readTree(uebPayload);
- } catch (IOException e) {
- returnWithError(exchange, uebPayload, "Invalid Payload");
- return;
- }
+ long startTime = System.currentTimeMillis();
- String action = getSpikeEventAction(exchange, uebPayload);
- if (action == null) {
- return;
- }
- JSONObject uebObjEntity = getUebContentAsJson(uebPayload, EVENT_VERTEX);
- if (uebObjEntity == null) {
- returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
- return;
- }
+ SpikeEventMeta meta = processSpikeEvent(exchange);
- SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload);
- if (eventVertex == null) {
- return;
- }
- String entityType = getEntityType(exchange, eventVertex, uebPayload);
- if (entityType == null) {
- return;
- }
- String entityLink = getEntityLink(exchange, eventVertex, uebPayload);
- if (entityLink == null) {
- return;
- }
- DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload);
- if (oxmJaxbContext == null) {
+ if ( meta == null ) {
return;
}
- String oxmEntityType = getOxmEntityType(entityType);
- List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload,
- exchange);
- if (searchableAttr == null) {
- return;
- }
-
- // log the fact that all data are in good shape
- logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
- logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
- uebPayload);
-
-
+
/*
* Use the versioned OXM Entity class to get access to cross-entity reference helper collections
*/
VersionedOxmEntities oxmEntities =
EntityOxmReferenceHelper.getInstance().getVersionedOxmEntities(Version.valueOf(oxmVersion));
- /*
- * Process for autosuggestable entities
- */
if (oxmEntities != null) {
Map<String, OxmEntityDescriptor> rootDescriptor =
oxmEntities.getSuggestableEntityDescriptors();
if (!rootDescriptor.isEmpty()) {
List<String> suggestibleAttrInPayload = new ArrayList<>();
- List<String> suggestibleAttrInOxm = extractSuggestableAttr(oxmEntities, entityType);
+ List<String> suggestibleAttrInOxm =
+ extractSuggestableAttr(oxmEntities, meta.getSpikeEventVertex().getType());
if (suggestibleAttrInOxm != null) {
- for (String attr: suggestibleAttrInOxm){
- if ( uebAsJson.get("vertex").get("properties").has(attr) ){
+ for (String attr : suggestibleAttrInOxm) {
+ if (meta.getVertexProperties().has(attr)) {
suggestibleAttrInPayload.add(attr);
}
}
@@ -149,24 +95,26 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc
if (suggestibleAttrInPayload.isEmpty()) {
return;
}
- List<String> suggestionAliases = extractAliasForSuggestableEntity(oxmEntities, entityType);
+ List<String> suggestionAliases = extractAliasForSuggestableEntity(oxmEntities, meta.getSpikeEventVertex().getType());
/*
* It was decided to silently ignore DELETE requests for resources we don't allow to be
* deleted. e.g. auto-suggestion deletion is not allowed while aggregation deletion is.
*/
- if (!ACTION_DELETE.equalsIgnoreCase(action)) {
+ if (!DELETE.equalsIgnoreCase(meta.getBodyOperationType())) {
List<ArrayList<String>> listOfValidPowerSetElements =
SearchSuggestionPermutation.getNonEmptyUniqueLists(suggestibleAttrInPayload);
-
+
+ JsonNode propertiesNode = mapper.readValue(meta.getVertexProperties().toString(), JsonNode.class);
+
// Now we have a list containing the power-set (minus empty element) for the status that are
// available in the payload. Try inserting a document for every combination.
for (ArrayList<String> list : listOfValidPowerSetElements) {
SuggestionSearchEntity suggestionSearchEntity = new SuggestionSearchEntity();
- suggestionSearchEntity.setEntityType(entityType);
+ suggestionSearchEntity.setEntityType(meta.getSpikeEventVertex().getType());
suggestionSearchEntity.setSuggestableAttr(list);
suggestionSearchEntity.setEntityTypeAliases(suggestionAliases);
- suggestionSearchEntity.setFilterBasedPayloadFromResponse(uebAsJson.get("vertex").get("properties"),
+ suggestionSearchEntity.setFilterBasedPayloadFromResponse(propertiesNode,
suggestibleAttrInOxm, list);
suggestionSearchEntity.setSuggestionInputPermutations(
suggestionSearchEntity.generateSuggestionInputPermutations());
@@ -180,7 +128,7 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc
+ e.getLocalizedMessage());
}
- handleSearchServiceOperation(suggestionSearchEntity, action, searchIndexName);
+ handleSearchServiceOperation(suggestionSearchEntity, meta.getBodyOperationType(), searchIndexName);
}
}
}
@@ -192,7 +140,7 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc
setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
return;
}
-
+
public List<String> extractSuggestableAttr(VersionedOxmEntities oxmEntities, String entityType) {
// Extract suggestable attributeshandleTopographicalData
Map<String, OxmEntityDescriptor> rootDescriptor = oxmEntities.getSuggestableEntityDescriptors();
@@ -225,28 +173,4 @@ public class SpikeAutosuggestIndexProcessor extends AbstractSpikeEntityEventProc
return desc.getAlias();
}
- private void parseLatestOxmVersion() {
- int latestVersion = -1;
- if (oxmVersionContextMap != null) {
- Iterator it = oxmVersionContextMap.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry pair = (Map.Entry) it.next();
-
- String version = pair.getKey().toString();
- int versionNum = Integer.parseInt(version.substring(1, version.length()));
-
- if (versionNum > latestVersion) {
- latestVersion = versionNum;
- oxmVersion = pair.getKey().toString();
- }
-
- logger.info(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_FOUND, pair.getKey().toString());
- }
- } else {
- logger.error(EntityEventPolicyMsgs.PROCESS_OXM_MODEL_MISSING, "");
- }
- }
-
-
-
}
diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java
index 36bb142..caee8b4 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntityEventPolicy.java
@@ -30,21 +30,23 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.eclipse.persistence.dynamic.DynamicType;
import org.eclipse.persistence.internal.helper.DatabaseField;
import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.oxm.MediaType;
import org.json.JSONException;
import org.json.JSONObject;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
-import org.onap.aai.datarouter.entity.SpikeEventEntity;
import org.onap.aai.datarouter.entity.DocumentStoreDataEntity;
-import org.onap.aai.datarouter.entity.SpikeEventVertex;
import org.onap.aai.datarouter.entity.OxmEntityDescriptor;
+import org.onap.aai.datarouter.entity.SpikeEventEntity;
+import org.onap.aai.datarouter.entity.SpikeEventVertex;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
import org.onap.aai.datarouter.util.EntityOxmReferenceHelper;
import org.onap.aai.datarouter.util.ExternalOxmModelProcessor;
@@ -68,17 +70,38 @@ public class SpikeEntityEventPolicy implements Processor {
private Collection<ExternalOxmModelProcessor> externalOxmModelProcessors;
-
- private static final String ACTION_CREATE = "create";
- private static final String EVENT_VERTEX = "vertex";
- private static final String ACTION_DELETE = "delete";
- private static final String ACTION_UPDATE = "update";
+ /**
+ * Note (8-June-2018):
+ *
+ * At present we don't need to support every event-type that could be present in the spike-events.
+ * The only one we want is a SPIKE "update-notification". In the future perhaps we need to add some
+ * configurability to the camel-route itself with a json camel filtering component so that routing
+ * logic can be modified as part of the camel route spring-xml instead of hard-coding the
+ * event filtering in here.
+ */
+
private static final String PROCESS_SPIKE_EVENT = "Process Spike Event";
+
+ private static final String UPDATE_NOTIFICATION = "update-notification";
+ private static final String SPIKE = "SPIKE";
+
+ private static final String HEADER_KEY = "header";
+ private static final String EVENT_TYPE_KEY = "event-type";
+ private static final String SOURCE_NAME_KEY = "source-name";
+ private static final String BODY_KEY = "body";
private static final String OPERATION_KEY = "operation";
+ private static final String VERTEX_KEY = "vertex";
+ private static final String VERTEX_KEY_KEY = "key";
+ private static final String VERTEX_TYPE_KEY = "type";
+ private static final String VERTEX_SCHEMA_VERSION_KEY = "schema-version";
+
+ private static final String CREATE = "create";
+ private static final String DELETE = "delete";
+ private static final String UPDATE = "update";
private static final List<String> SUPPORTED_ACTIONS =
- Arrays.asList(ACTION_CREATE, ACTION_UPDATE, ACTION_DELETE);
+ Arrays.asList(CREATE, UPDATE, DELETE);
Map<String, DynamicJAXBContext> oxmVersionContextMap = new HashMap<>();
private String oxmVersion = null;
@@ -122,7 +145,7 @@ public class SpikeEntityEventPolicy implements Processor {
private void parseLatestOxmVersion() {
int latestVersion = -1;
if (oxmVersionContextMap != null) {
- Iterator it = oxmVersionContextMap.entrySet().iterator();
+ Iterator<Entry<String, DynamicJAXBContext>> it = oxmVersionContextMap.entrySet().iterator();
while (it.hasNext()) {
Map.Entry pair = (Map.Entry) it.next();
@@ -148,6 +171,7 @@ public class SpikeEntityEventPolicy implements Processor {
logger.info(EntityEventPolicyMsgs.ENTITY_EVENT_POLICY_REGISTERED);
}
+
/**
* Convert object to json.
@@ -177,65 +201,109 @@ public class SpikeEntityEventPolicy implements Processor {
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
}
- public boolean isJSONValid(String test) {
+ @Override
+ public void process(Exchange exchange) /*throws Exception*/ {
+
+ long startTime = System.currentTimeMillis();
+
+ final String eventPayload = exchange.getIn().getBody().toString();
+ JSONObject mainJson = null;
+
try {
- new JSONObject(test);
- } catch (JSONException ex) {
- return false;
+
+ /*
+ * It is expected that mainJson will have multiple top level objects:
+ * - header
+ * - body
+ * - result
+ */
+
+ mainJson = new JSONObject(eventPayload);
+ } catch (JSONException exc) {
+ returnWithError(exchange, eventPayload, "Invalid Payload");
+ return;
}
- return true;
- }
+
+ JSONObject eventHeader = mainJson.getJSONObject(HEADER_KEY);
- @Override
- public void process(Exchange exchange) throws Exception {
+ if (eventHeader == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + HEADER_KEY);
+ return;
+ }
+
+ /*
+ * Only process SPIKE update-notification events
+ */
+
+ final String sourceName = eventHeader.getString(SOURCE_NAME_KEY);
+ final String eventType = eventHeader.getString(EVENT_TYPE_KEY);
- long startTime = System.currentTimeMillis();
- String uebPayload = exchange.getIn().getBody().toString();
- if (uebPayload == null || !isJSONValid(uebPayload)) {
- uebPayload = exchange.getIn().getBody(String.class);
- if (uebPayload == null || !isJSONValid(uebPayload)) {
- returnWithError(exchange, uebPayload, "Invalid Payload");
- return;
- }
+ if (!(SPIKE.equals(sourceName) && UPDATE_NOTIFICATION.equals(eventType))) {
+ // drop event
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Ignoring event with sourceName='"
+ + sourceName + "' and eventType='" + eventType + "'. Payload=" + eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Ignoring event with sourceName='" + sourceName + "' and eventType='" + eventType + "'.");
+
+ /*
+ * I don't think ignoring a non-applicable event constitutes a failure.
+ */
+
+ setResponse(exchange, ResponseType.SUCCESS, additionalInfo);
+ return;
}
+
+ JSONObject eventBody = mainJson.getJSONObject(BODY_KEY);
+ if (eventBody == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + BODY_KEY);
+ return;
+ }
- JSONObject mainJson = new JSONObject(uebPayload);
- String action = mainJson.getString(OPERATION_KEY);
+ String action = eventBody.getString(OPERATION_KEY);
if (action == null || !SUPPORTED_ACTIONS.contains(action.toLowerCase())) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Unrecognized action '" + action + "'", uebPayload);
+ "Unrecognized action '" + action + "'", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Unrecognized action '" + action + "'");
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
- // Load the UEB payload data, any errors will result in a failure and discard
-
- JSONObject spikeObjVertex = getUebContentAsJson(uebPayload, EVENT_VERTEX);
- if (spikeObjVertex == null) {
- returnWithError(exchange, uebPayload, "Payload is missing " + EVENT_VERTEX);
+ // Load the event body data, any errors will result in a failure and discard
+
+ JSONObject spikeVertex = eventBody.getJSONObject(VERTEX_KEY);
+ if (spikeVertex == null) {
+ returnWithError(exchange, eventPayload, "Payload is missing " + VERTEX_KEY);
return;
}
- SpikeEventVertex eventVertex = initializeSpikeEventVertex(spikeObjVertex.toString());
+ SpikeEventVertex eventVertex = null;
+ try {
+ eventVertex = initializeSpikeEventVertex(spikeVertex);
+ } catch (JSONException exc) {
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Error initializating spike event. Error: " + exc.getMessage(),
+ eventPayload);
+ logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
+ "Error initializating spike event. Error: " + exc.getMessage());
+
+ setResponse(exchange, ResponseType.FAILURE, additionalInfo);
+ return;
+ }
DynamicJAXBContext oxmJaxbContext = loadOxmContext(oxmVersion.toLowerCase());
if (oxmJaxbContext == null) {
logger.error(EntityEventPolicyMsgs.OXM_VERSION_NOT_SUPPORTED, oxmVersion);
- logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", uebPayload);
+ logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "OXM version mismatch", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
-
-
String entityType = eventVertex.getType();
if (entityType == null || entityType.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload header missing entity type", uebPayload);
+ "Payload header missing entity type", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload header missing entity type");
@@ -246,7 +314,7 @@ public class SpikeEntityEventPolicy implements Processor {
String entityKey = eventVertex.getKey();
if (entityKey == null || entityKey.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Payload vertex missing entity key",
- uebPayload);
+ eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload vertex missing entity key");
@@ -256,7 +324,7 @@ public class SpikeEntityEventPolicy implements Processor {
String entityLink = eventVertex.getEntityLink();
if (entityLink == null || entityLink.isEmpty()) {
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload header missing entity link", uebPayload);
+ "Payload header missing entity link", eventPayload);
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload header missing entity link");
@@ -267,8 +335,7 @@ public class SpikeEntityEventPolicy implements Processor {
// log the fact that all data are in good shape
logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
- uebPayload);
-
+ eventPayload);
// Process for building SpikeEventEntity object
String[] entityTypeArr = entityType.split("-");
@@ -276,7 +343,7 @@ public class SpikeEntityEventPolicy implements Processor {
for (String entityWord : entityTypeArr) {
oxmEntityType += entityWord.substring(0, 1).toUpperCase() + entityWord.substring(1);
}
-
+
List<String> searchableAttr =
getOxmAttributes(oxmJaxbContext, oxmEntityType, entityType, "searchable");
if (searchableAttr == null) {
@@ -284,28 +351,28 @@ public class SpikeEntityEventPolicy implements Processor {
"Searchable attribute not found for payload entity type '" + entityType + "'");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
"Searchable attribute not found for payload entity type '" + entityType + "'",
- uebPayload);
+ eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
String entityPrimaryKeyFieldName =
- getEntityPrimaryKeyFieldName(oxmJaxbContext, uebPayload, oxmEntityType, entityType);
+ getEntityPrimaryKeyFieldName(oxmJaxbContext, eventPayload, oxmEntityType, entityType);
if (entityPrimaryKeyFieldName == null) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary key attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing primary key attribute", uebPayload);
+ "Payload missing primary key attribute", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
}
- String entityPrimaryKeyFieldValue = lookupValueUsingKey(uebPayload, entityPrimaryKeyFieldName);
- if (entityPrimaryKeyFieldValue.isEmpty()) {
+ String entityPrimaryKeyFieldValue = lookupValueUsingKey(eventPayload, entityPrimaryKeyFieldName);
+ if (entityPrimaryKeyFieldValue == null || entityPrimaryKeyFieldValue.isEmpty()) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing primary value attribute");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing primary value attribute", uebPayload);
+ "Payload missing primary value attribute", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
@@ -321,12 +388,14 @@ public class SpikeEntityEventPolicy implements Processor {
spikeEventEntity.setEntityPrimaryKeyValue(entityPrimaryKeyFieldValue);
spikeEventEntity.setEntityType(entityType);
spikeEventEntity.setLink(entityLink);
+
+ System.out.println(spikeEventEntity);
- if (!getSearchTags(spikeEventEntity, searchableAttr, uebPayload, action)) {
+ if (!getSearchTags(spikeEventEntity, searchableAttr, eventPayload, action)) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_NONVERBOSE,
"Payload missing searchable attribute for entity type '" + entityType + "'");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE,
- "Payload missing searchable attribute for entity type '" + entityType + "'", uebPayload);
+ "Payload missing searchable attribute for entity type '" + entityType + "'", eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
@@ -338,7 +407,7 @@ public class SpikeEntityEventPolicy implements Processor {
} catch (NoSuchAlgorithmException e) {
logger.error(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest");
logger.debug(EntityEventPolicyMsgs.DISCARD_EVENT_VERBOSE, "Cannot create unique SHA digest",
- uebPayload);
+ eventPayload);
setResponse(exchange, ResponseType.FAILURE, additionalInfo);
return;
@@ -363,60 +432,19 @@ public class SpikeEntityEventPolicy implements Processor {
exchange.getOut().setBody(additionalInfo);
}
+ private SpikeEventVertex initializeSpikeEventVertex(JSONObject vertexObject) throws JSONException {
- /*
- * Load the UEB JSON payload, any errors would result to a failure case response.
- */
- private JSONObject getUebContentAsJson(String payload, String contentKey) {
-
- JSONObject uebJsonObj;
- JSONObject uebObjContent;
-
- try {
- uebJsonObj = new JSONObject(payload);
- } catch (JSONException e) {
- logger.debug(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
- logger.error(EntityEventPolicyMsgs.UEB_INVALID_PAYLOAD_JSON_FORMAT, payload);
- return null;
- }
-
- if (uebJsonObj.has(contentKey)) {
- uebObjContent = uebJsonObj.getJSONObject(contentKey);
- } else {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, contentKey);
- return null;
- }
-
- return uebObjContent;
- }
-
-
- private SpikeEventVertex initializeSpikeEventVertex(String payload) {
-
- SpikeEventVertex eventVertex = null;
- ObjectMapper mapper = new ObjectMapper();
+ /*
+ * These are all critical keys
+ */
- // Make sure that were were actually passed in a valid string.
- if (payload == null || payload.isEmpty()) {
- logger.debug(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_TO_PARSE_PAYLOAD, EVENT_VERTEX);
+ final String vertexType = vertexObject.getString(VERTEX_TYPE_KEY);
+ final String vertexKey = vertexObject.getString(VERTEX_KEY_KEY);
+ final String vertexSchemaVersion = vertexObject.getString(VERTEX_SCHEMA_VERSION_KEY);
- return eventVertex;
- }
-
- // Marshal the supplied string into a UebEventHeader object.
- try {
- eventVertex = mapper.readValue(payload, SpikeEventVertex.class);
- } catch (JsonProcessingException e) {
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
- } catch (Exception e) {
- logger.error(EntityEventPolicyMsgs.UEB_FAILED_UEBEVENTHEADER_CONVERSION, e.toString());
- }
-
- if (eventVertex != null) {
- logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
- }
+ SpikeEventVertex eventVertex = new SpikeEventVertex(vertexType, vertexKey);
+ eventVertex.setSchemaVersion(vertexSchemaVersion);
+ logger.debug(EntityEventPolicyMsgs.UEB_EVENT_HEADER_PARSED, eventVertex.toString());
return eventVertex;
@@ -453,9 +481,9 @@ public class SpikeEntityEventPolicy implements Processor {
try {
jsonNode = mapper.readTree(mapper.getJsonFactory().createJsonParser(payload));
} catch (IOException e) {
- logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.debug(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
payload);
- logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, EVENT_VERTEX + " missing",
+ logger.error(EntityEventPolicyMsgs.FAILED_TO_PARSE_UEB_PAYLOAD, VERTEX_KEY + " missing",
"");
}
@@ -584,8 +612,8 @@ public class SpikeEntityEventPolicy implements Processor {
String entityId = eventEntity.getId();
- if ((action.equalsIgnoreCase(ACTION_CREATE) && entityId != null)
- || action.equalsIgnoreCase(ACTION_UPDATE)) {
+ if ((action.equalsIgnoreCase(CREATE) && entityId != null)
+ || action.equalsIgnoreCase(UPDATE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -603,11 +631,11 @@ public class SpikeEntityEventPolicy implements Processor {
// Write the entity to the search service.
// PUT
searchAgent.putDocument(index, entityId, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_CREATE)) {
+ } else if (action.equalsIgnoreCase(CREATE)) {
// Write the entry to the search service.
searchAgent.postDocument(index, eventEntity.getAsJson(), headers);
- } else if (action.equalsIgnoreCase(ACTION_DELETE)) {
+ } else if (action.equalsIgnoreCase(DELETE)) {
// Run the GET to retrieve the ETAG from the search service
OperationResult storedEntity = searchAgent.getDocument(index, entityId);
@@ -619,6 +647,13 @@ public class SpikeEntityEventPolicy implements Processor {
} else {
logger.error(EntityEventPolicyMsgs.NO_ETAG_AVAILABLE_FAILURE, index, entityId);
}
+
+ /*
+ * The Spring-Boot version of the search-data-service rejects the DELETE operation unless
+ * we specify a Content-Type.
+ */
+
+ headers.put("Content-Type", Arrays.asList(MediaType.APPLICATION_JSON.getMediaType()));
searchAgent.deleteDocument(index, eventEntity.getId(), headers);
} else {
diff --git a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java
index e9c6b06..8b43b1b 100644
--- a/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java
+++ b/src/main/java/org/onap/aai/datarouter/policy/SpikeEntitySearchProcessor.java
@@ -24,23 +24,20 @@ import java.io.FileNotFoundException;
import java.util.List;
import org.apache.camel.Exchange;
-import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
import org.onap.aai.datarouter.entity.SpikeEventEntity;
-import org.onap.aai.datarouter.entity.SpikeEventVertex;
+import org.onap.aai.datarouter.entity.SpikeEventMeta;
import org.onap.aai.datarouter.logging.EntityEventPolicyMsgs;
public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcessor {
public static final String additionalInfo = "Response of SpikeEntityEventPolicy";
- private static final String searchIndexSchema = "";
-
+ private static final String PROCESS_SPIKE_EVENT = "Process Spike Event";
/** Agent for communicating with the Search Service. */
- public SpikeEntitySearchProcessor(SpikeEventPolicyConfig config)
- throws FileNotFoundException {
+ public SpikeEntitySearchProcessor(SpikeEventPolicyConfig config) throws FileNotFoundException {
super(config);
}
@@ -56,53 +53,33 @@ public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcesso
public void process(Exchange exchange) throws Exception {
long startTime = System.currentTimeMillis();
- String uebPayload = getExchangeBody(exchange);
- if (uebPayload == null) {
- return;
- }
- String action = getSpikeEventAction(exchange, uebPayload);
- if (action == null) {
- return;
- }
- SpikeEventVertex eventVertex = populateEventVertex(exchange, uebPayload);
- if (eventVertex == null) {
- return;
- }
- String entityType = getEntityType(exchange, eventVertex, uebPayload);
- if (entityType == null) {
- return;
- }
- String entityLink = getEntityLink(exchange, eventVertex, uebPayload);
- if (entityLink == null) {
- return;
- }
- DynamicJAXBContext oxmJaxbContext = readOxm(exchange, uebPayload);
- if (oxmJaxbContext == null) {
+ SpikeEventMeta meta = processSpikeEvent(exchange);
+
+ if (meta == null) {
return;
}
- String oxmEntityType = getOxmEntityType(entityType);
- List<String> searchableAttr = getSearchableAttibutes(oxmJaxbContext, oxmEntityType, entityType, uebPayload,
- exchange);
+
+ String oxmEntityType = getOxmEntityType(meta.getSpikeEventVertex().getType());
+ List<String> searchableAttr = getSearchableAttibutes(meta.getOxmJaxbContext(), oxmEntityType,
+ meta.getSpikeEventVertex().getType(), meta.getEventEntity().toString(), exchange);
if (searchableAttr == null) {
return;
}
- // log the fact that all data are in good shape
- logger.info(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_NONVERBOSE, action, entityType);
- logger.debug(EntityEventPolicyMsgs.PROCESS_ENTITY_EVENT_POLICY_VERBOSE, action, entityType,
- uebPayload);
SpikeEventEntity spikeEventEntity = new SpikeEventEntity();
- spikeEventEntity.setEntityType(entityType);
- spikeEventEntity.setLink(entityLink);
- spikeEventEntity = populateSpikeEventEntity(exchange, spikeEventEntity, oxmJaxbContext,
- entityType, action, uebPayload, oxmEntityType,searchableAttr);
+ spikeEventEntity.setEntityType(meta.getSpikeEventVertex().getType());
+ spikeEventEntity.setLink(meta.getSpikeEventVertex().getEntityLink());
+ spikeEventEntity = populateSpikeEventEntity(exchange, spikeEventEntity,
+ meta.getOxmJaxbContext(), meta.getSpikeEventVertex().getType(), meta.getBodyOperationType(),
+ meta.getVertexProperties().toString(), oxmEntityType, searchableAttr);
+
if (spikeEventEntity == null) {
return;
}
- handleSearchServiceOperation(spikeEventEntity, action, searchIndexName);
+ handleSearchServiceOperation(spikeEventEntity, meta.getBodyOperationType(), searchIndexName);
long stopTime = System.currentTimeMillis();
metricsLogger.info(EntityEventPolicyMsgs.OPERATION_RESULT_NO_ERRORS, PROCESS_SPIKE_EVENT,
String.valueOf(stopTime - startTime));
@@ -110,6 +87,7 @@ public class SpikeEntitySearchProcessor extends AbstractSpikeEntityEventProcesso
return;
}
+
/*
* This is not for this Scope. We get back to it later. (updateCerInEntity) private void
* updateSearchEntityWithCrossEntityReference(SpikeEventEntity spikeEventEntity) { try {
diff --git a/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java b/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java
index 06e1ab5..fc34437 100644
--- a/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java
+++ b/src/main/java/org/onap/aai/datarouter/util/VersionedOxmEntities.java
@@ -341,5 +341,9 @@ public class VersionedOxmEntities {
return sb.toString();
}
+
+ public Map<String, DynamicType> getEntityTypeLookup() {
+ return entityTypeLookup;
+ }
}