aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/aai/spike/event
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/aai/spike/event')
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java75
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java64
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java194
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java132
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java206
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java116
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java356
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java32
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java143
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java33
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java37
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java168
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java127
13 files changed, 1683 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java
new file mode 100644
index 0000000..1b92653
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java
@@ -0,0 +1,75 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+
+public class EventEnvelope {
+
+ private EventHeader header;
+ private SpikeGraphEvent body;
+
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public EventEnvelope(EventHeader eventHeader, SpikeGraphEvent body) {
+ this.header = eventHeader;
+ this.body = body;
+ }
+
+ /**
+ * Construct the envelope header from the provided event.
+ *
+ * @param event the Spike graph event for a vertex or edge operation
+ */
+ public EventEnvelope(SpikeGraphEvent event) {
+ this.header = new EventHeader.Builder().requestId(event.getTransactionId()).build();
+ this.body = event;
+ }
+
+ public EventHeader getEventHeader() {
+ return header;
+ }
+
+ public SpikeGraphEvent getBody() {
+ return body;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java
new file mode 100644
index 0000000..8b62ffa
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java
@@ -0,0 +1,64 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import java.util.UUID;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.schema.GraphEventTransformer;
+
+public class EventEnvelopeParser {
+
+ /**
+ * Parses the event to extract the content of the body and generate a {@link GizmoGraphEvent}
+ * object.
+ *
+ * @param event event envelope with both header and body properties
+ * @return the body of the event represented by a {@link GizmoGraphEvent} object
+ * @throws SpikeException if the event cannot be parsed
+ */
+ public GizmoGraphEvent parseEvent(String event) throws SpikeException {
+
+ GizmoGraphEvent graphEvent = GizmoGraphEvent.fromJson(extractEventBody(event));
+
+ GraphEventTransformer.populateUUID(graphEvent);
+ if (graphEvent.getTransactionId() == null || graphEvent.getTransactionId().isEmpty()) {
+ graphEvent.setTransactionId(UUID.randomUUID().toString());
+ }
+ if (graphEvent.getRelationship() != null) {
+ GraphEventTransformer.validateEdgeModel(graphEvent.getRelationship());
+ } else if (graphEvent.getVertex() != null) {
+ GraphEventTransformer.validateVertexModel(graphEvent.getVertex());
+ } else {
+ throw new SpikeException("Unable to parse event: " + event);
+ }
+
+ return graphEvent;
+ }
+
+ private String extractEventBody(String event) {
+ JsonParser jsonParser = new JsonParser();
+ JsonElement jsonElement = jsonParser.parse(event);
+ return jsonElement.getAsJsonObject().get("body").toString();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java
new file mode 100644
index 0000000..f8aa72b
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java
@@ -0,0 +1,194 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.UUID;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+
+public class EventHeader {
+
+ @SerializedName("request-id")
+ private String requestId;
+
+ private String timestamp;
+
+ @SerializedName("source-name")
+ private String sourceName;
+
+ @SerializedName("event-type")
+ private String eventType;
+
+ @SerializedName("validation-entity-type")
+ private String validationEntityType;
+
+ @SerializedName("validation-top-entity-type")
+ private String validationTopEntityType;
+
+ @SerializedName("entity-link")
+ private String entityLink;
+
+ private static final String MICROSERVICE_NAME = "SPIKE";
+ private static final String EVENT_TYPE_PENDING_UPDATE = "update-notification";
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public static class Builder {
+
+ private String requestId;
+ private String validationEntityType;
+ private String validationTopEntityType;
+ private String entityLink;
+
+ public Builder requestId(String val) {
+ requestId = val;
+ return this;
+ }
+
+ public Builder validationEntityType(String val) {
+ validationEntityType = val;
+ return this;
+ }
+
+ public Builder validationTopEntityType(String val) {
+ validationTopEntityType = val;
+ return this;
+ }
+
+ public Builder entityLink(String val) {
+ entityLink = val;
+ return this;
+ }
+
+ public EventHeader build() {
+ return new EventHeader(this);
+ }
+ }
+
+ private EventHeader(Builder builder) {
+ requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
+ timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
+ sourceName = MICROSERVICE_NAME;
+ eventType = EVENT_TYPE_PENDING_UPDATE;
+
+ validationEntityType = builder.validationEntityType;
+ validationTopEntityType = builder.validationTopEntityType;
+ entityLink = builder.entityLink;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // GETTERS
+ ///////////////////////////////////////////////////////////////////////////
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getValidationEntityType() {
+ return validationEntityType;
+ }
+
+ public String getValidationTopEntityType() {
+ return validationTopEntityType;
+ }
+
+ public String getEntityLink() {
+ return entityLink;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // OVERRIDES
+ ///////////////////////////////////////////////////////////////////////////
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType,
+ this.validationTopEntityType, this.entityLink);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof EventHeader)) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ }
+ EventHeader rhs = (EventHeader) obj;
+ // @formatter:off
+ return new EqualsBuilder()
+ .append(requestId, rhs.requestId)
+ .append(timestamp, rhs.timestamp)
+ .append(sourceName, rhs.sourceName)
+ .append(eventType, rhs.eventType)
+ .append(validationEntityType, rhs.validationEntityType)
+ .append(validationTopEntityType, rhs.validationTopEntityType)
+ .append(entityLink, rhs.entityLink)
+ .isEquals();
+ // @formatter:on
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java
new file mode 100644
index 0000000..af10a6e
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java
@@ -0,0 +1,132 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of an Edge as provided by the graph data store.
+ *
+ */
+public class GizmoEdge {
+
+ /**
+ * The unique identifier used to identify this edge in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Source vertex for our edge. */
+ private GizmoVertex source;
+
+ /** Target vertex for our edge. */
+ private GizmoVertex target;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public GizmoVertex getSource() {
+ return source;
+ }
+
+ public void setSource(GizmoVertex source) {
+ this.source = source;
+ }
+
+ public GizmoVertex getTarget() {
+ return target;
+ }
+
+ public void setTarget(GizmoVertex target) {
+ this.target = target;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Unmarshalls this Edge object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Edge.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Edge object.
+ *
+ * @param json - The JSON string to produce the Edge from.
+ *
+ * @return - A Edge object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoEdge fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into an Edge object.
+ return gson.fromJson(json, GizmoEdge.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java
new file mode 100644
index 0000000..419b1a5
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java
@@ -0,0 +1,206 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.event.outgoing.SpikeEdge;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent.SpikeOperation;
+import org.onap.aai.spike.event.outgoing.SpikeVertex;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.schema.EdgeRulesLoader;
+import org.onap.aai.spike.schema.OXMModelLoader;
+
+public class GizmoGraphEvent {
+ private String operation;
+
+ @SerializedName("transaction-id")
+ private String transactionId;
+
+ @SerializedName("database-transaction-id")
+ private String dbTransactionId;
+
+ private long timestamp;
+
+ private GizmoVertex vertex;
+
+ private GizmoEdge relationship;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public GizmoVertex getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(GizmoVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public GizmoEdge getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(GizmoEdge relationship) {
+ this.relationship = relationship;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public String getDbTransactionId() {
+ return dbTransactionId;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ public SpikeGraphEvent toSpikeGraphEvent() throws SpikeException {
+ SpikeGraphEvent spikeGraphEvent = new SpikeGraphEvent();
+ spikeGraphEvent.setTransactionId(this.getTransactionId());
+ spikeGraphEvent.setDbTransactionId(this.getDbTransactionId());
+ spikeGraphEvent.setOperationTimestamp(this.getTimestamp());
+ if (this.getOperation().equalsIgnoreCase("STORE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.CREATE);
+ } else if (this.getOperation().equalsIgnoreCase("REPLACE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.UPDATE);
+ } else if (this.getOperation().equalsIgnoreCase("DELETE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.DELETE);
+ } else {
+ throw new SpikeException("Invalid operation in GizmoGraphEvent: " + this.getOperation());
+ }
+ if (this.getVertex() != null) {
+ SpikeVertex spikeVertex = new SpikeVertex();
+ spikeVertex.setId(this.getVertex().getId());
+ spikeVertex.setType(this.getVertex().getType());
+ spikeVertex.setModelVersion(OXMModelLoader.getLatestVersion());
+ spikeVertex.setProperties(this.getVertex().getProperties());
+ spikeGraphEvent.setVertex(spikeVertex);
+
+ } else if (this.getRelationship() != null) {
+ SpikeEdge spikeEdge = new SpikeEdge();
+ spikeEdge.setId(this.getRelationship().getId());
+ spikeEdge.setModelVersion(EdgeRulesLoader.getLatestSchemaVersion());
+ spikeEdge.setType(this.getRelationship().getType());
+
+ SpikeVertex spikeSourceVertex = new SpikeVertex();
+ spikeSourceVertex.setId(this.getRelationship().getSource().getId());
+ spikeSourceVertex.setType(this.getRelationship().getSource().getType());
+ spikeEdge.setSource(spikeSourceVertex);
+
+ SpikeVertex spikeTargetVertex = new SpikeVertex();
+ spikeTargetVertex.setId(this.getRelationship().getTarget().getId());
+ spikeTargetVertex.setType(this.getRelationship().getTarget().getType());
+ spikeEdge.setTarget(spikeTargetVertex);
+
+ spikeEdge.setProperties(this.getRelationship().getProperties());
+ spikeGraphEvent.setRelationship(spikeEdge);
+ }
+
+ return spikeGraphEvent;
+
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoGraphEvent fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, GizmoGraphEvent.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+
+ public String getObjectKey() {
+ if (this.getVertex() != null) {
+ return this.getVertex().getId();
+ } else if (this.getRelationship() != null) {
+ return this.getRelationship().getId();
+ }
+
+ return null;
+
+ }
+
+ public String getObjectType() {
+ if (this.getVertex() != null) {
+ return "Vertex->" + this.getVertex().getType();
+ } else if (this.getRelationship() != null) {
+ return "Relationship->" + this.getRelationship().getType();
+ }
+
+ return null;
+
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java
new file mode 100644
index 0000000..2f55c57
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java
@@ -0,0 +1,116 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of a Vertex as provided by the graph data store.
+ *
+ */
+public class GizmoVertex {
+
+ /**
+ * The unique identifier used to identify this vertex in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoVertex fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, GizmoVertex.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
new file mode 100644
index 0000000..58795b3
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
@@ -0,0 +1,356 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+
+
+/**
+ * Instances of this class maintain a buffer of events which have been received and are queued up to
+ * be processed.
+ * <p>
+ * A background thread advances a pointer into the buffer which always points to the head of the
+ * most recent consecutive block of processed events. This allows us, at any time, to know what
+ * offset value can be safely committed to the event store (meaning any events before that offset
+ * into the event topic will not be reprocessed on a restart).
+ */
+public class OffsetManager {
+
+ /** Buffer that we use for caching 'in flight' events. */
+ private RingEntry[] ringBuffer;
+
+ /** Number of elements that can be stored in the buffer. */
+ private int bufferSize;
+
+ /** Pointer to the next free slot in the buffer. */
+ private AtomicLong writePointer = new AtomicLong(0L);
+
+ /**
+ * Pointer to the next slot in the buffer to wait to be published so that we can commit its offset.
+ */
+ private long commitPointer = 0;
+
+ /**
+ * Executor for scheduling the background task which commits offsets to the event bus.
+ */
+ private ScheduledExecutorService offsetCommitService = Executors.newScheduledThreadPool(1);
+
+ /**
+ * The next offset value which represents the head of a consecutive block of events which have been
+ * processed.
+ */
+ private Long nextOffsetToCommit = null;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(OffsetManager.class.getName());
+
+
+ /**
+ * Creates a new instance of the offset manager.
+ *
+ * @param bufferCapacity - The requested size of the buffer that we will use to cache offsets for
+ * events that are waiting to be processed.
+ * @param offsetCheckPeriodMs - The period at which we will try to update what we consider to be the
+ * next offset that can be safely committed to the event bus.
+ */
+ public OffsetManager(int bufferCapacity, long offsetCheckPeriodMs) {
+
+ // In order to make the math work nicely for our write and commit pointers, we
+ // need our buffer size to be a power of 2, so round the supplied buffer size
+ // up to ensure that it is a power of two.
+ //
+ // This way we can just keep incrementing our pointers forever without worrying
+ // about wrapping (we'll eventually roll over from LongMax to LongMin, but if the
+ // buffer size is a power of 2 then our modded physical indexes will still magically
+ // map to the next consecutive index. (Math!)
+ bufferSize = nextPowerOf2(bufferCapacity);
+
+ // Now, allocate and initialize our ring buffer.
+ ringBuffer = new RingEntry[bufferSize];
+ for (int i = 0; i < bufferSize; i++) {
+ ringBuffer[i] = new RingEntry();
+ }
+
+ // Schedule a task to commit the most recent offset value to the event library.
+ offsetCommitService.scheduleAtFixedRate(new OffsetCommitter(), offsetCheckPeriodMs, offsetCheckPeriodMs,
+ TimeUnit.MILLISECONDS);
+
+ logger.info(SpikeMsgs.OFFSET_MANAGER_STARTED, Integer.toString(bufferSize), Long.toString(offsetCheckPeriodMs));
+ }
+
+
+ /**
+ * Logs an event with the offset manager.
+ *
+ * @param transactionId - The transaction id associated with this event.
+ * @param commitOffset - The event bus offset associated with this event.
+ *
+ * @return - The index into the offset manager's buffer for this event.
+ */
+ public int cacheEvent(String transactionId, long commitOffset) {
+
+ // Get the index to the next free slot in the ring...
+ int index = nextFreeSlot();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caching event with transaction-id: " + transactionId + " offset: " + commitOffset
+ + " to offset manager at index: " + index);
+ }
+
+ // ...and update it with the event meta data we want to cache.
+ ringBuffer[index].setTransactionId(transactionId);
+ ringBuffer[index].setCommitOffset(commitOffset);
+
+ return index;
+ }
+
+
+ /**
+ * Marks a cached event as 'published'.
+ *
+ * @param anIndex - The index into the event cache that we want to update.
+ * @throws SpikeException
+ */
+ public void markAsPublished(int anIndex) throws SpikeException {
+
+ // Make sure that we were supplied a valid index.
+ if ((anIndex < 0) || (anIndex > bufferSize - 1)) {
+ throw new SpikeException("Invalid index " + anIndex + " for offset manager buffer.");
+ }
+
+ // It is only valid to mark a cell as 'Published' if it is already
+ // in the 'Processing' state.
+ if (!ringBuffer[anIndex].state.compareAndSet(RingEntry.PROCESSING, RingEntry.PUBLISHED)) {
+ throw new SpikeException("Unexpected event state: " + state2String(ringBuffer[anIndex].state.get()));
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event in offset manger buffer at index: " + anIndex + " marked as 'published'");
+ }
+ }
+
+
+ /**
+ * Marks a cached event as 'published'.
+ *
+ * @param transactionId - The transaction id of the event we want to update.
+ *
+ * @throws SpikeException
+ */
+ public void markAsPublished(String transactionId) throws SpikeException {
+
+ // Iterate over the ring buffer and try to find the specified transaction
+ // id.
+ for (int i = 0; i < bufferSize; i++) {
+
+ // Is this the one?
+ if (ringBuffer[i].getTransactionId() == transactionId) {
+
+ // Found the one we are looking for!
+ markAsPublished(i);
+ return;
+ }
+ }
+
+ // If we made it here then we didn't find an event with the supplied transaction id.
+ throw new SpikeException("No event with transaction id: " + transactionId + " exists in offset manager buffer");
+ }
+
+
+ /**
+ * Retrieves our current view of what is the most recent offset value that can be safely committed
+ * to the event bus (meaning that all events on the topic before that offset value have been
+ * processed and shouldn't be re-consumed after a restart).
+ *
+ * @return - The next 'safe' offset.
+ */
+ public long getNextOffsetToCommit() {
+ return nextOffsetToCommit;
+ }
+
+
+ /**
+ * Finds the next slot in the ring which is marked as 'free'.
+ *
+ * @return - An index into the ring buffer.
+ */
+ private int nextFreeSlot() {
+
+ int currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
+ while (!ringBuffer[currentIndex].state.compareAndSet(RingEntry.FREE, RingEntry.PROCESSING)) {
+ currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
+ }
+
+ return currentIndex;
+ }
+
+
+ /**
+ * Given a number, this helper method finds the next largest number that is a power of 2.
+ *
+ * @param aNumber - The number to compute the next power of two for.
+ *
+ * @return - The next largest power of 2 for the supplied number.
+ */
+ private int nextPowerOf2(int aNumber) {
+
+ int powerOfTwo = 1;
+ while (powerOfTwo < aNumber) {
+ powerOfTwo *= 2;
+ }
+ return powerOfTwo;
+ }
+
+ private String state2String(int aState) {
+
+ switch (aState) {
+ case RingEntry.FREE:
+ return "FREE";
+
+ case RingEntry.PROCESSING:
+ return "PROCESSING";
+
+ case RingEntry.PUBLISHED:
+ return "PUBLISHED";
+
+ default:
+ return "UNDEFINED(" + aState + ")";
+ }
+ }
+
+
+ /**
+ * Defines the structure of the entries in the ring buffer which represent events which are 'in
+ * flight'.
+ */
+ public class RingEntry {
+
+ private final static int FREE = 1; // Slot in buffer is available to be written to.
+ private final static int PROCESSING = 2; // Slot in buffer represents an event which is waiting to be processed.
+ private final static int PUBLISHED = 3; // Slot in buffer represents an event which has been published.
+
+ /**
+ * Describes the state of this entry in the ring:
+ * <p>
+ * FREE = This slot is currently unused and may be written to.
+ * <p>
+ * PROCESSING = This slot describes an event which has not yet been published.
+ * <p>
+ * PUBLISHED = This lot describes an event which has been published and therefore may be released.
+ */
+ public AtomicInteger state = new AtomicInteger(FREE);
+
+ /** The unique identifier of the event which this entry represents. */
+ private String transactionId;
+
+ /** The event bus offset associated with the event which this entry represents. */
+ private long commitOffset;
+
+
+ /**
+ * Retrieve the transaction id for the event represented by this entry.
+ *
+ * @return - Transaction id.
+ */
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+
+ /**
+ * Assigns a transaction id to this entry.
+ *
+ * @param transactionId - The unique id for this entry.
+ */
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+
+ /**
+ * Retrieves the offset of the event represented by this entry.
+ *
+ * @return - An event bus offset value.
+ */
+ public long getCommitOffset() {
+ return commitOffset;
+ }
+
+
+ /**
+ * Assigns an offset value to this entry.
+ *
+ * @param commitOffset - Offset value for this entry.
+ */
+ public void setCommitOffset(long commitOffset) {
+ this.commitOffset = commitOffset;
+ }
+ }
+
+
+ /**
+ * This class implements a simple background task which wakes up periodically and determines the
+ * next available offset from the ring buffer which is safe to commit to the event bus.
+ */
+ private class OffsetCommitter implements Runnable {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+
+ // Get the index into the ring buffer of the next slot to be checked.
+ int currentCommitIndex = (int) (commitPointer % bufferSize);
+
+ // If this entry is in the 'published' state then its offset is good to be
+ // committed.
+ while (ringBuffer[currentCommitIndex].state.get() == RingEntry.PUBLISHED) {
+
+ // Grab the offset of the current entry.
+ nextOffsetToCommit = ringBuffer[currentCommitIndex].getCommitOffset();
+
+ // We don't need to keep the current entry alive any longer, so free it and advance
+ // to the next entry in the ring.
+ ringBuffer[currentCommitIndex].state.set(RingEntry.FREE);
+ commitPointer++;
+
+ // Update our index and loop back to check the next one. We will keep advancing
+ // as long as we have consecutive entries that are flagged as 'published'.
+ currentCommitIndex = (int) (commitPointer % bufferSize);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Offset to commit to event bus: "
+ + ((nextOffsetToCommit != null) ? nextOffsetToCommit : "none"));
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java
new file mode 100644
index 0000000..2ba949a
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java
@@ -0,0 +1,32 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface GsonExclude {
+ // Field tag only annotation
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java
new file mode 100644
index 0000000..7fb030d
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java
@@ -0,0 +1,143 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of an Edge as provided by the graph data store.
+ *
+ */
+public class SpikeEdge {
+
+ /**
+ * The unique identifier used to identify this edge in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ @SerializedName("schema-version")
+ private String modelVersion;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Source vertex for our edge. */
+ private SpikeVertex source;
+
+ /** Target vertex for our edge. */
+ private SpikeVertex target;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public SpikeVertex getSource() {
+ return source;
+ }
+
+ public void setSource(SpikeVertex source) {
+ this.source = source;
+ }
+
+ public SpikeVertex getTarget() {
+ return target;
+ }
+
+ public void setTarget(SpikeVertex target) {
+ this.target = target;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ public String getModelVersion() {
+ return modelVersion;
+ }
+
+ public void setModelVersion(String modelVersion) {
+ this.modelVersion = modelVersion;
+ }
+
+ /**
+ * Unmarshalls this Edge object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Edge.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Edge object.
+ *
+ * @param json - The JSON string to produce the Edge from.
+ *
+ * @return - A Edge object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeEdge fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into an Edge object.
+ return gson.fromJson(json, SpikeEdge.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java
new file mode 100644
index 0000000..87ce2ce
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java
@@ -0,0 +1,33 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import java.util.Comparator;
+
+public class SpikeEventComparator implements Comparator<SpikeGraphEvent> {
+
+ @Override
+ public int compare(SpikeGraphEvent e1, SpikeGraphEvent e2) {
+
+ // Note: this will break down if the difference is more than 24 days, or int.max milliseconds
+ return (int) (e1.getOperationTimestamp() - e2.getOperationTimestamp());
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java
new file mode 100644
index 0000000..54ac391
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java
@@ -0,0 +1,37 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.ExclusionStrategy;
+import com.google.gson.FieldAttributes;
+
+public class SpikeEventExclusionStrategy implements ExclusionStrategy {
+
+ @Override
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return fieldAttributes.getAnnotation(GsonExclude.class) != null;
+ }
+
+ @Override
+ public boolean shouldSkipClass(Class<?> aClass) {
+ return false;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java
new file mode 100644
index 0000000..a076a55
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java
@@ -0,0 +1,168 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+public class SpikeGraphEvent {
+
+ public enum SpikeOperation {
+ CREATE, UPDATE, DELETE
+ }
+
+ private SpikeOperation operation;
+
+ @SerializedName("transaction-id")
+ private String transactionId;
+
+ @SerializedName("database-transaction-id")
+ private String dbTransactionId;
+
+ @SerializedName("timestamp")
+ private long operationTimestamp;
+
+ private SpikeVertex vertex;
+
+ private SpikeEdge relationship;
+
+ // Time this event was received in spike, used to determine when to send the event
+ @GsonExclude
+ private long spikeTimestamp = System.currentTimeMillis();
+
+ /** Serializer/deserializer for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public SpikeOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(SpikeOperation operation) {
+ this.operation = operation;
+ }
+
+ public long getOperationTimestamp() {
+ return operationTimestamp;
+ }
+
+ public void setOperationTimestamp(long operationTimestamp) {
+ this.operationTimestamp = operationTimestamp;
+ }
+
+
+
+ public SpikeVertex getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(SpikeVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public SpikeEdge getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(SpikeEdge relationship) {
+ this.relationship = relationship;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public void setDbTransactionId(String dbTransactionId) {
+ this.dbTransactionId = dbTransactionId;
+ }
+
+ public long getSpikeTimestamp() {
+ return spikeTimestamp;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeGraphEvent fromJson(String json) throws SpikeException {
+
+ try {
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, SpikeGraphEvent.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+
+ public String getObjectKey() {
+ if (this.getVertex() != null) {
+ return this.getVertex().getId();
+ } else if (this.getRelationship() != null) {
+ return this.getRelationship().getId();
+ }
+
+ return null;
+ }
+
+ public String getObjectType() {
+ if (this.getVertex() != null) {
+ return "Vertex->" + this.getVertex().getType();
+ } else if (this.getRelationship() != null) {
+ return "Relationship->" + this.getRelationship().getType();
+ }
+
+ return null;
+ }
+}
+
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java
new file mode 100644
index 0000000..cd51855
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java
@@ -0,0 +1,127 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of a Vertex as provided by the graph data store.
+ *
+ */
+public class SpikeVertex {
+
+ /**
+ * The unique identifier used to identify this vertex in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ @SerializedName("schema-version")
+ private String modelVersion;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ public String getModelVersion() {
+ return modelVersion;
+ }
+
+ public void setModelVersion(String modelVersion) {
+ this.modelVersion = modelVersion;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeVertex fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, SpikeVertex.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+}