aboutsummaryrefslogtreecommitdiffstats
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/org/onap/aai/spike/SpikeApplication.java48
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java75
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java64
-rw-r--r--src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java194
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java132
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java206
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java116
-rw-r--r--src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java356
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java32
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java143
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java33
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java37
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java168
-rw-r--r--src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java127
-rw-r--r--src/main/java/org/onap/aai/spike/exception/SpikeException.java46
-rw-r--r--src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java167
-rw-r--r--src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java229
-rw-r--r--src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java282
-rw-r--r--src/main/java/org/onap/aai/spike/schema/MapAdapter.java65
-rw-r--r--src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java187
-rw-r--r--src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java98
-rw-r--r--src/main/java/org/onap/aai/spike/schema/Relationship.java102
-rw-r--r--src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java133
-rw-r--r--src/main/java/org/onap/aai/spike/service/EchoService.java85
-rw-r--r--src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java298
-rw-r--r--src/main/java/org/onap/aai/spike/service/SpikeService.java75
-rw-r--r--src/main/java/org/onap/aai/spike/util/FileWatcher.java45
-rw-r--r--src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java116
-rw-r--r--src/main/java/org/onap/aai/spike/util/SpikeConstants.java43
-rw-r--r--src/main/java/org/onap/aai/spike/util/SpikeProperties.java52
30 files changed, 3754 insertions, 0 deletions
diff --git a/src/main/java/org/onap/aai/spike/SpikeApplication.java b/src/main/java/org/onap/aai/spike/SpikeApplication.java
new file mode 100644
index 0000000..e031de3
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/SpikeApplication.java
@@ -0,0 +1,48 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 European Software Marketing Ltd.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike;
+
+import java.util.HashMap;
+import org.eclipse.jetty.util.security.Password;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.context.annotation.ImportResource;
+
+/**
+ * Spike service Spring Boot Application
+ */
+@SpringBootApplication
+@ImportResource({"file:${SERVICE_BEANS}/*.xml"})
+public class SpikeApplication extends SpringBootServletInitializer {
+
+ public static void main(String[] args) {
+ String keyStorePassword = System.getProperty("KEY_STORE_PASSWORD");
+ if (keyStorePassword == null || keyStorePassword.isEmpty()) {
+ throw new IllegalArgumentException("System Property KEY_STORE_PASSWORD not set");
+ }
+ HashMap<String, Object> props = new HashMap<>();
+ props.put("server.ssl.key-store-password", Password.deobfuscate(keyStorePassword));
+ new SpikeApplication().configure(new SpringApplicationBuilder(SpikeApplication.class).properties(props))
+ .run(args);
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java
new file mode 100644
index 0000000..1b92653
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java
@@ -0,0 +1,75 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+
+public class EventEnvelope {
+
+ private EventHeader header;
+ private SpikeGraphEvent body;
+
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public EventEnvelope(EventHeader eventHeader, SpikeGraphEvent body) {
+ this.header = eventHeader;
+ this.body = body;
+ }
+
+ /**
+ * Construct the envelope header from the provided event.
+ *
+ * @param event the Spike graph event for a vertex or edge operation
+ */
+ public EventEnvelope(SpikeGraphEvent event) {
+ this.header = new EventHeader.Builder().requestId(event.getTransactionId()).build();
+ this.body = event;
+ }
+
+ public EventHeader getEventHeader() {
+ return header;
+ }
+
+ public SpikeGraphEvent getBody() {
+ return body;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java
new file mode 100644
index 0000000..8b62ffa
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java
@@ -0,0 +1,64 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import java.util.UUID;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.schema.GraphEventTransformer;
+
+public class EventEnvelopeParser {
+
+ /**
+ * Parses the event to extract the content of the body and generate a {@link GizmoGraphEvent}
+ * object.
+ *
+ * @param event event envelope with both header and body properties
+ * @return the body of the event represented by a {@link GizmoGraphEvent} object
+ * @throws SpikeException if the event cannot be parsed
+ */
+ public GizmoGraphEvent parseEvent(String event) throws SpikeException {
+
+ GizmoGraphEvent graphEvent = GizmoGraphEvent.fromJson(extractEventBody(event));
+
+ GraphEventTransformer.populateUUID(graphEvent);
+ if (graphEvent.getTransactionId() == null || graphEvent.getTransactionId().isEmpty()) {
+ graphEvent.setTransactionId(UUID.randomUUID().toString());
+ }
+ if (graphEvent.getRelationship() != null) {
+ GraphEventTransformer.validateEdgeModel(graphEvent.getRelationship());
+ } else if (graphEvent.getVertex() != null) {
+ GraphEventTransformer.validateVertexModel(graphEvent.getVertex());
+ } else {
+ throw new SpikeException("Unable to parse event: " + event);
+ }
+
+ return graphEvent;
+ }
+
+ private String extractEventBody(String event) {
+ JsonParser jsonParser = new JsonParser();
+ JsonElement jsonElement = jsonParser.parse(event);
+ return jsonElement.getAsJsonObject().get("body").toString();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java
new file mode 100644
index 0000000..f8aa72b
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java
@@ -0,0 +1,194 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.envelope;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.UUID;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+
+public class EventHeader {
+
+ @SerializedName("request-id")
+ private String requestId;
+
+ private String timestamp;
+
+ @SerializedName("source-name")
+ private String sourceName;
+
+ @SerializedName("event-type")
+ private String eventType;
+
+ @SerializedName("validation-entity-type")
+ private String validationEntityType;
+
+ @SerializedName("validation-top-entity-type")
+ private String validationTopEntityType;
+
+ @SerializedName("entity-link")
+ private String entityLink;
+
+ private static final String MICROSERVICE_NAME = "SPIKE";
+ private static final String EVENT_TYPE_PENDING_UPDATE = "update-notification";
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public static class Builder {
+
+ private String requestId;
+ private String validationEntityType;
+ private String validationTopEntityType;
+ private String entityLink;
+
+ public Builder requestId(String val) {
+ requestId = val;
+ return this;
+ }
+
+ public Builder validationEntityType(String val) {
+ validationEntityType = val;
+ return this;
+ }
+
+ public Builder validationTopEntityType(String val) {
+ validationTopEntityType = val;
+ return this;
+ }
+
+ public Builder entityLink(String val) {
+ entityLink = val;
+ return this;
+ }
+
+ public EventHeader build() {
+ return new EventHeader(this);
+ }
+ }
+
+ private EventHeader(Builder builder) {
+ requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
+ timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
+ sourceName = MICROSERVICE_NAME;
+ eventType = EVENT_TYPE_PENDING_UPDATE;
+
+ validationEntityType = builder.validationEntityType;
+ validationTopEntityType = builder.validationTopEntityType;
+ entityLink = builder.entityLink;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // GETTERS
+ ///////////////////////////////////////////////////////////////////////////
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getValidationEntityType() {
+ return validationEntityType;
+ }
+
+ public String getValidationTopEntityType() {
+ return validationTopEntityType;
+ }
+
+ public String getEntityLink() {
+ return entityLink;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // OVERRIDES
+ ///////////////////////////////////////////////////////////////////////////
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType,
+ this.validationTopEntityType, this.entityLink);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof EventHeader)) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ }
+ EventHeader rhs = (EventHeader) obj;
+ // @formatter:off
+ return new EqualsBuilder()
+ .append(requestId, rhs.requestId)
+ .append(timestamp, rhs.timestamp)
+ .append(sourceName, rhs.sourceName)
+ .append(eventType, rhs.eventType)
+ .append(validationEntityType, rhs.validationEntityType)
+ .append(validationTopEntityType, rhs.validationTopEntityType)
+ .append(entityLink, rhs.entityLink)
+ .isEquals();
+ // @formatter:on
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java
new file mode 100644
index 0000000..af10a6e
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java
@@ -0,0 +1,132 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of an Edge as provided by the graph data store.
+ *
+ */
+public class GizmoEdge {
+
+ /**
+ * The unique identifier used to identify this edge in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Source vertex for our edge. */
+ private GizmoVertex source;
+
+ /** Target vertex for our edge. */
+ private GizmoVertex target;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public GizmoVertex getSource() {
+ return source;
+ }
+
+ public void setSource(GizmoVertex source) {
+ this.source = source;
+ }
+
+ public GizmoVertex getTarget() {
+ return target;
+ }
+
+ public void setTarget(GizmoVertex target) {
+ this.target = target;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Unmarshalls this Edge object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Edge.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Edge object.
+ *
+ * @param json - The JSON string to produce the Edge from.
+ *
+ * @return - A Edge object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoEdge fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into an Edge object.
+ return gson.fromJson(json, GizmoEdge.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java
new file mode 100644
index 0000000..419b1a5
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java
@@ -0,0 +1,206 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.event.outgoing.SpikeEdge;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent.SpikeOperation;
+import org.onap.aai.spike.event.outgoing.SpikeVertex;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.schema.EdgeRulesLoader;
+import org.onap.aai.spike.schema.OXMModelLoader;
+
+public class GizmoGraphEvent {
+ private String operation;
+
+ @SerializedName("transaction-id")
+ private String transactionId;
+
+ @SerializedName("database-transaction-id")
+ private String dbTransactionId;
+
+ private long timestamp;
+
+ private GizmoVertex vertex;
+
+ private GizmoEdge relationship;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getOperation() {
+ return operation;
+ }
+
+ public void setOperation(String operation) {
+ this.operation = operation;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ public GizmoVertex getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(GizmoVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public GizmoEdge getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(GizmoEdge relationship) {
+ this.relationship = relationship;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public String getDbTransactionId() {
+ return dbTransactionId;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ public SpikeGraphEvent toSpikeGraphEvent() throws SpikeException {
+ SpikeGraphEvent spikeGraphEvent = new SpikeGraphEvent();
+ spikeGraphEvent.setTransactionId(this.getTransactionId());
+ spikeGraphEvent.setDbTransactionId(this.getDbTransactionId());
+ spikeGraphEvent.setOperationTimestamp(this.getTimestamp());
+ if (this.getOperation().equalsIgnoreCase("STORE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.CREATE);
+ } else if (this.getOperation().equalsIgnoreCase("REPLACE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.UPDATE);
+ } else if (this.getOperation().equalsIgnoreCase("DELETE")) {
+ spikeGraphEvent.setOperation(SpikeOperation.DELETE);
+ } else {
+ throw new SpikeException("Invalid operation in GizmoGraphEvent: " + this.getOperation());
+ }
+ if (this.getVertex() != null) {
+ SpikeVertex spikeVertex = new SpikeVertex();
+ spikeVertex.setId(this.getVertex().getId());
+ spikeVertex.setType(this.getVertex().getType());
+ spikeVertex.setModelVersion(OXMModelLoader.getLatestVersion());
+ spikeVertex.setProperties(this.getVertex().getProperties());
+ spikeGraphEvent.setVertex(spikeVertex);
+
+ } else if (this.getRelationship() != null) {
+ SpikeEdge spikeEdge = new SpikeEdge();
+ spikeEdge.setId(this.getRelationship().getId());
+ spikeEdge.setModelVersion(EdgeRulesLoader.getLatestSchemaVersion());
+ spikeEdge.setType(this.getRelationship().getType());
+
+ SpikeVertex spikeSourceVertex = new SpikeVertex();
+ spikeSourceVertex.setId(this.getRelationship().getSource().getId());
+ spikeSourceVertex.setType(this.getRelationship().getSource().getType());
+ spikeEdge.setSource(spikeSourceVertex);
+
+ SpikeVertex spikeTargetVertex = new SpikeVertex();
+ spikeTargetVertex.setId(this.getRelationship().getTarget().getId());
+ spikeTargetVertex.setType(this.getRelationship().getTarget().getType());
+ spikeEdge.setTarget(spikeTargetVertex);
+
+ spikeEdge.setProperties(this.getRelationship().getProperties());
+ spikeGraphEvent.setRelationship(spikeEdge);
+ }
+
+ return spikeGraphEvent;
+
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoGraphEvent fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, GizmoGraphEvent.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+
+ public String getObjectKey() {
+ if (this.getVertex() != null) {
+ return this.getVertex().getId();
+ } else if (this.getRelationship() != null) {
+ return this.getRelationship().getId();
+ }
+
+ return null;
+
+ }
+
+ public String getObjectType() {
+ if (this.getVertex() != null) {
+ return "Vertex->" + this.getVertex().getType();
+ } else if (this.getRelationship() != null) {
+ return "Relationship->" + this.getRelationship().getType();
+ }
+
+ return null;
+
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java
new file mode 100644
index 0000000..2f55c57
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java
@@ -0,0 +1,116 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of a Vertex as provided by the graph data store.
+ *
+ */
+public class GizmoVertex {
+
+ /**
+ * The unique identifier used to identify this vertex in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static GizmoVertex fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, GizmoVertex.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
new file mode 100644
index 0000000..58795b3
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java
@@ -0,0 +1,356 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.incoming;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+
+
+/**
+ * Instances of this class maintain a buffer of events which have been received and are queued up to
+ * be processed.
+ * <p>
+ * A background thread advances a pointer into the buffer which always points to the head of the
+ * most recent consecutive block of processed events. This allows us, at any time, to know what
+ * offset value can be safely committed to the event store (meaning any events before that offset
+ * into the event topic will not be reprocessed on a restart).
+ */
+public class OffsetManager {
+
+ /** Buffer that we use for caching 'in flight' events. */
+ private RingEntry[] ringBuffer;
+
+ /** Number of elements that can be stored in the buffer. */
+ private int bufferSize;
+
+ /** Pointer to the next free slot in the buffer. */
+ private AtomicLong writePointer = new AtomicLong(0L);
+
+ /**
+ * Pointer to the next slot in the buffer to wait to be published so that we can commit its offset.
+ */
+ private long commitPointer = 0;
+
+ /**
+ * Executor for scheduling the background task which commits offsets to the event bus.
+ */
+ private ScheduledExecutorService offsetCommitService = Executors.newScheduledThreadPool(1);
+
+ /**
+ * The next offset value which represents the head of a consecutive block of events which have been
+ * processed.
+ */
+ private Long nextOffsetToCommit = null;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(OffsetManager.class.getName());
+
+
+ /**
+ * Creates a new instance of the offset manager.
+ *
+ * @param bufferCapacity - The requested size of the buffer that we will use to cache offsets for
+ * events that are waiting to be processed.
+ * @param offsetCheckPeriodMs - The period at which we will try to update what we consider to be the
+ * next offset that can be safely committed to the event bus.
+ */
+ public OffsetManager(int bufferCapacity, long offsetCheckPeriodMs) {
+
+ // In order to make the math work nicely for our write and commit pointers, we
+ // need our buffer size to be a power of 2, so round the supplied buffer size
+ // up to ensure that it is a power of two.
+ //
+ // This way we can just keep incrementing our pointers forever without worrying
+ // about wrapping (we'll eventually roll over from LongMax to LongMin, but if the
+ // buffer size is a power of 2 then our modded physical indexes will still magically
+ // map to the next consecutive index. (Math!)
+ bufferSize = nextPowerOf2(bufferCapacity);
+
+ // Now, allocate and initialize our ring buffer.
+ ringBuffer = new RingEntry[bufferSize];
+ for (int i = 0; i < bufferSize; i++) {
+ ringBuffer[i] = new RingEntry();
+ }
+
+ // Schedule a task to commit the most recent offset value to the event library.
+ offsetCommitService.scheduleAtFixedRate(new OffsetCommitter(), offsetCheckPeriodMs, offsetCheckPeriodMs,
+ TimeUnit.MILLISECONDS);
+
+ logger.info(SpikeMsgs.OFFSET_MANAGER_STARTED, Integer.toString(bufferSize), Long.toString(offsetCheckPeriodMs));
+ }
+
+
+ /**
+ * Logs an event with the offset manager.
+ *
+ * @param transactionId - The transaction id associated with this event.
+ * @param commitOffset - The event bus offset associated with this event.
+ *
+ * @return - The index into the offset manager's buffer for this event.
+ */
+ public int cacheEvent(String transactionId, long commitOffset) {
+
+ // Get the index to the next free slot in the ring...
+ int index = nextFreeSlot();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Caching event with transaction-id: " + transactionId + " offset: " + commitOffset
+ + " to offset manager at index: " + index);
+ }
+
+ // ...and update it with the event meta data we want to cache.
+ ringBuffer[index].setTransactionId(transactionId);
+ ringBuffer[index].setCommitOffset(commitOffset);
+
+ return index;
+ }
+
+
+ /**
+ * Marks a cached event as 'published'.
+ *
+ * @param anIndex - The index into the event cache that we want to update.
+ * @throws SpikeException
+ */
+ public void markAsPublished(int anIndex) throws SpikeException {
+
+ // Make sure that we were supplied a valid index.
+ if ((anIndex < 0) || (anIndex > bufferSize - 1)) {
+ throw new SpikeException("Invalid index " + anIndex + " for offset manager buffer.");
+ }
+
+ // It is only valid to mark a cell as 'Published' if it is already
+ // in the 'Processing' state.
+ if (!ringBuffer[anIndex].state.compareAndSet(RingEntry.PROCESSING, RingEntry.PUBLISHED)) {
+ throw new SpikeException("Unexpected event state: " + state2String(ringBuffer[anIndex].state.get()));
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Event in offset manger buffer at index: " + anIndex + " marked as 'published'");
+ }
+ }
+
+
+ /**
+ * Marks a cached event as 'published'.
+ *
+ * @param transactionId - The transaction id of the event we want to update.
+ *
+ * @throws SpikeException
+ */
+ public void markAsPublished(String transactionId) throws SpikeException {
+
+ // Iterate over the ring buffer and try to find the specified transaction
+ // id.
+ for (int i = 0; i < bufferSize; i++) {
+
+ // Is this the one?
+ if (ringBuffer[i].getTransactionId() == transactionId) {
+
+ // Found the one we are looking for!
+ markAsPublished(i);
+ return;
+ }
+ }
+
+ // If we made it here then we didn't find an event with the supplied transaction id.
+ throw new SpikeException("No event with transaction id: " + transactionId + " exists in offset manager buffer");
+ }
+
+
+ /**
+ * Retrieves our current view of what is the most recent offset value that can be safely committed
+ * to the event bus (meaning that all events on the topic before that offset value have been
+ * processed and shouldn't be re-consumed after a restart).
+ *
+ * @return - The next 'safe' offset.
+ */
+ public long getNextOffsetToCommit() {
+ return nextOffsetToCommit;
+ }
+
+
+ /**
+ * Finds the next slot in the ring which is marked as 'free'.
+ *
+ * @return - An index into the ring buffer.
+ */
+ private int nextFreeSlot() {
+
+ int currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
+ while (!ringBuffer[currentIndex].state.compareAndSet(RingEntry.FREE, RingEntry.PROCESSING)) {
+ currentIndex = (int) (writePointer.getAndIncrement() % bufferSize);
+ }
+
+ return currentIndex;
+ }
+
+
+ /**
+ * Given a number, this helper method finds the next largest number that is a power of 2.
+ *
+ * @param aNumber - The number to compute the next power of two for.
+ *
+ * @return - The next largest power of 2 for the supplied number.
+ */
+ private int nextPowerOf2(int aNumber) {
+
+ int powerOfTwo = 1;
+ while (powerOfTwo < aNumber) {
+ powerOfTwo *= 2;
+ }
+ return powerOfTwo;
+ }
+
+ private String state2String(int aState) {
+
+ switch (aState) {
+ case RingEntry.FREE:
+ return "FREE";
+
+ case RingEntry.PROCESSING:
+ return "PROCESSING";
+
+ case RingEntry.PUBLISHED:
+ return "PUBLISHED";
+
+ default:
+ return "UNDEFINED(" + aState + ")";
+ }
+ }
+
+
+ /**
+ * Defines the structure of the entries in the ring buffer which represent events which are 'in
+ * flight'.
+ */
+ public class RingEntry {
+
+ private final static int FREE = 1; // Slot in buffer is available to be written to.
+ private final static int PROCESSING = 2; // Slot in buffer represents an event which is waiting to be processed.
+ private final static int PUBLISHED = 3; // Slot in buffer represents an event which has been published.
+
+ /**
+ * Describes the state of this entry in the ring:
+ * <p>
+ * FREE = This slot is currently unused and may be written to.
+ * <p>
+ * PROCESSING = This slot describes an event which has not yet been published.
+ * <p>
+ * PUBLISHED = This lot describes an event which has been published and therefore may be released.
+ */
+ public AtomicInteger state = new AtomicInteger(FREE);
+
+ /** The unique identifier of the event which this entry represents. */
+ private String transactionId;
+
+ /** The event bus offset associated with the event which this entry represents. */
+ private long commitOffset;
+
+
+ /**
+ * Retrieve the transaction id for the event represented by this entry.
+ *
+ * @return - Transaction id.
+ */
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+
+ /**
+ * Assigns a transaction id to this entry.
+ *
+ * @param transactionId - The unique id for this entry.
+ */
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+
+ /**
+ * Retrieves the offset of the event represented by this entry.
+ *
+ * @return - An event bus offset value.
+ */
+ public long getCommitOffset() {
+ return commitOffset;
+ }
+
+
+ /**
+ * Assigns an offset value to this entry.
+ *
+ * @param commitOffset - Offset value for this entry.
+ */
+ public void setCommitOffset(long commitOffset) {
+ this.commitOffset = commitOffset;
+ }
+ }
+
+
+ /**
+ * This class implements a simple background task which wakes up periodically and determines the
+ * next available offset from the ring buffer which is safe to commit to the event bus.
+ */
+ private class OffsetCommitter implements Runnable {
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Runnable#run()
+ */
+ @Override
+ public void run() {
+
+ // Get the index into the ring buffer of the next slot to be checked.
+ int currentCommitIndex = (int) (commitPointer % bufferSize);
+
+ // If this entry is in the 'published' state then its offset is good to be
+ // committed.
+ while (ringBuffer[currentCommitIndex].state.get() == RingEntry.PUBLISHED) {
+
+ // Grab the offset of the current entry.
+ nextOffsetToCommit = ringBuffer[currentCommitIndex].getCommitOffset();
+
+ // We don't need to keep the current entry alive any longer, so free it and advance
+ // to the next entry in the ring.
+ ringBuffer[currentCommitIndex].state.set(RingEntry.FREE);
+ commitPointer++;
+
+ // Update our index and loop back to check the next one. We will keep advancing
+ // as long as we have consecutive entries that are flagged as 'published'.
+ currentCommitIndex = (int) (commitPointer % bufferSize);
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Offset to commit to event bus: "
+ + ((nextOffsetToCommit != null) ? nextOffsetToCommit : "none"));
+ }
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java
new file mode 100644
index 0000000..2ba949a
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java
@@ -0,0 +1,32 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD})
+public @interface GsonExclude {
+ // Field tag only annotation
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java
new file mode 100644
index 0000000..7fb030d
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java
@@ -0,0 +1,143 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of an Edge as provided by the graph data store.
+ *
+ */
+public class SpikeEdge {
+
+ /**
+ * The unique identifier used to identify this edge in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ @SerializedName("schema-version")
+ private String modelVersion;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Source vertex for our edge. */
+ private SpikeVertex source;
+
+ /** Target vertex for our edge. */
+ private SpikeVertex target;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public SpikeVertex getSource() {
+ return source;
+ }
+
+ public void setSource(SpikeVertex source) {
+ this.source = source;
+ }
+
+ public SpikeVertex getTarget() {
+ return target;
+ }
+
+ public void setTarget(SpikeVertex target) {
+ this.target = target;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ public String getModelVersion() {
+ return modelVersion;
+ }
+
+ public void setModelVersion(String modelVersion) {
+ this.modelVersion = modelVersion;
+ }
+
+ /**
+ * Unmarshalls this Edge object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Edge.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Edge object.
+ *
+ * @param json - The JSON string to produce the Edge from.
+ *
+ * @return - A Edge object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeEdge fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into an Edge object.
+ return gson.fromJson(json, SpikeEdge.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java
new file mode 100644
index 0000000..87ce2ce
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java
@@ -0,0 +1,33 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import java.util.Comparator;
+
+public class SpikeEventComparator implements Comparator<SpikeGraphEvent> {
+
+ @Override
+ public int compare(SpikeGraphEvent e1, SpikeGraphEvent e2) {
+
+ // Note: this will break down if the difference is more than 24 days, or int.max milliseconds
+ return (int) (e1.getOperationTimestamp() - e2.getOperationTimestamp());
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java
new file mode 100644
index 0000000..54ac391
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java
@@ -0,0 +1,37 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.ExclusionStrategy;
+import com.google.gson.FieldAttributes;
+
+public class SpikeEventExclusionStrategy implements ExclusionStrategy {
+
+ @Override
+ public boolean shouldSkipField(FieldAttributes fieldAttributes) {
+ return fieldAttributes.getAnnotation(GsonExclude.class) != null;
+ }
+
+ @Override
+ public boolean shouldSkipClass(Class<?> aClass) {
+ return false;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java
new file mode 100644
index 0000000..a076a55
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java
@@ -0,0 +1,168 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+public class SpikeGraphEvent {
+
+ public enum SpikeOperation {
+ CREATE, UPDATE, DELETE
+ }
+
+ private SpikeOperation operation;
+
+ @SerializedName("transaction-id")
+ private String transactionId;
+
+ @SerializedName("database-transaction-id")
+ private String dbTransactionId;
+
+ @SerializedName("timestamp")
+ private long operationTimestamp;
+
+ private SpikeVertex vertex;
+
+ private SpikeEdge relationship;
+
+ // Time this event was received in spike, used to determine when to send the event
+ @GsonExclude
+ private long spikeTimestamp = System.currentTimeMillis();
+
+ /** Serializer/deserializer for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public SpikeOperation getOperation() {
+ return operation;
+ }
+
+ public void setOperation(SpikeOperation operation) {
+ this.operation = operation;
+ }
+
+ public long getOperationTimestamp() {
+ return operationTimestamp;
+ }
+
+ public void setOperationTimestamp(long operationTimestamp) {
+ this.operationTimestamp = operationTimestamp;
+ }
+
+
+
+ public SpikeVertex getVertex() {
+ return vertex;
+ }
+
+ public void setVertex(SpikeVertex vertex) {
+ this.vertex = vertex;
+ }
+
+ public SpikeEdge getRelationship() {
+ return relationship;
+ }
+
+ public void setRelationship(SpikeEdge relationship) {
+ this.relationship = relationship;
+ }
+
+ public String getTransactionId() {
+ return transactionId;
+ }
+
+ public void setTransactionId(String transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ public void setDbTransactionId(String dbTransactionId) {
+ this.dbTransactionId = dbTransactionId;
+ }
+
+ public long getSpikeTimestamp() {
+ return spikeTimestamp;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeGraphEvent fromJson(String json) throws SpikeException {
+
+ try {
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, SpikeGraphEvent.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+ return toJson();
+ }
+
+ public String getObjectKey() {
+ if (this.getVertex() != null) {
+ return this.getVertex().getId();
+ } else if (this.getRelationship() != null) {
+ return this.getRelationship().getId();
+ }
+
+ return null;
+ }
+
+ public String getObjectType() {
+ if (this.getVertex() != null) {
+ return "Vertex->" + this.getVertex().getType();
+ } else if (this.getRelationship() != null) {
+ return "Relationship->" + this.getRelationship().getType();
+ }
+
+ return null;
+ }
+}
+
diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java
new file mode 100644
index 0000000..cd51855
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java
@@ -0,0 +1,127 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.event.outgoing;
+
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonElement;
+import com.google.gson.annotations.SerializedName;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class provides a generic representation of a Vertex as provided by the graph data store.
+ *
+ */
+public class SpikeVertex {
+
+ /**
+ * The unique identifier used to identify this vertex in the graph data store.
+ */
+ @SerializedName("key")
+ private String id;
+
+ @SerializedName("schema-version")
+ private String modelVersion;
+
+ /** Type label assigned to this vertex. */
+ private String type;
+
+ /** Map of all of the properties assigned to this vertex. */
+ private JsonElement properties;
+
+ /** Marshaller/unmarshaller for converting to/from JSON. */
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create();
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public JsonElement getProperties() {
+ return properties;
+ }
+
+ public void setProperties(JsonElement properties) {
+ this.properties = properties;
+ }
+
+ public String getModelVersion() {
+ return modelVersion;
+ }
+
+ public void setModelVersion(String modelVersion) {
+ this.modelVersion = modelVersion;
+ }
+
+ /**
+ * Unmarshalls this Vertex object into a JSON string.
+ *
+ * @return - A JSON format string representation of this Vertex.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Marshalls the provided JSON string into a Vertex object.
+ *
+ * @param json - The JSON string to produce the Vertex from.
+ *
+ * @return - A Vertex object.
+ *
+ * @throws SpikeException
+ */
+ public static SpikeVertex fromJson(String json) throws SpikeException {
+
+ try {
+
+ // Make sure that we were actually provided a non-empty string
+ // before we
+ // go any further.
+ if (json == null || json.isEmpty()) {
+ throw new SpikeException("Empty or null JSON string.");
+ }
+
+ // Marshall the string into a Vertex object.
+ return gson.fromJson(json, SpikeVertex.class);
+
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse JSON string: " + ex.getMessage());
+ }
+ }
+
+ @Override
+ public String toString() {
+
+ return toJson();
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/exception/SpikeException.java b/src/main/java/org/onap/aai/spike/exception/SpikeException.java
new file mode 100644
index 0000000..90404fa
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/exception/SpikeException.java
@@ -0,0 +1,46 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.exception;
+
+
+public class SpikeException extends Exception {
+
+ private static final long serialVersionUID = 8162385108397238865L;
+
+
+ public SpikeException() {}
+
+ public SpikeException(String message) {
+ super(message);
+ }
+
+ public SpikeException(Throwable cause) {
+ super(cause);
+ }
+
+ public SpikeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public SpikeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java b/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java
new file mode 100644
index 0000000..77d7734
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java
@@ -0,0 +1,167 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Spike
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+package org.onap.aai.spike.logging;
+
+import com.att.eelf.i18n.EELFResourceManager;
+import org.onap.aai.cl.eelf.LogMessageEnum;
+
+public enum SpikeMsgs implements LogMessageEnum {
+
+ /**
+ * Unable to parse schema file: {0} due to error : {1}
+ *
+ * Arguments: {0} = schema file name {1} = error
+ */
+ INVALID_OXM_FILE,
+
+ /**
+ * Invalid OXM dir: {0}
+ *
+ * Arguments: {0} = Directory.
+ */
+ INVALID_OXM_DIR,
+
+ /**
+ * Unable to commit offset to event bus due to error: {0}
+ *
+ * Arguments: {0} = Failure cause.
+ */
+ OFFSET_COMMIT_FAILURE,
+
+ /**
+ * Unable to load OXM schema: {0}
+ *
+ * Arguments: {0} = error
+ */
+ OXM_LOAD_ERROR,
+
+ /**
+ * OXM file change detected: {0}
+ *
+ * Arguments: {0} = file name
+ */
+ OXM_FILE_CHANGED,
+
+ /**
+ * Successfully loaded schema: {0}
+ *
+ * Arguments: {0} = oxm filename
+ */
+ LOADED_OXM_FILE,
+
+ /**
+ * Successfully loaded Edge Properties Files: {0}
+ *
+ * <p>
+ * Arguments: {0} = oxm filename
+ */
+ LOADED_DB_RULE_FILE,
+
+ /**
+ * Successfully Started Spike Service Arguments: {0} = Event interface implementation class name
+ */
+ SPIKE_SERVICE_STARTED_SUCCESSFULLY,
+
+ /**
+ * Event bus offset manager started: buffer size={0} commit period={1}
+ *
+ * Arguments: {0} = Event buffer capacity {1} = Period in ms at which event bus offsets will be
+ * committed.
+ */
+ OFFSET_MANAGER_STARTED,
+
+ /**
+ * Unable to initialize : {0}
+ *
+ * Arguments: {0} = Service name
+ */
+ SPIKE_SERVICE_STARTED_FAILURE,
+
+ /**
+ * Unable to consume event due to : {0}
+ *
+ * Arguments: {0} = error message
+ */
+ SPIKE_EVENT_CONSUME_FAILURE,
+ /**
+ * Unable to publish event due to : {0}
+ *
+ * Arguments: {0} = error message
+ */
+ SPIKE_EVENT_PUBLISH_FAILURE,
+ /**
+ * Event Received : {0}
+ *
+ * Arguments: {0} = event
+ */
+ SPIKE_EVENT_RECEIVED,
+
+ /**
+ * Event Processed : {0}
+ *
+ * Arguments: {0} = event
+ */
+ SPIKE_EVENT_PROCESSED,
+ /**
+ * Event Published : {0}
+ *
+ * Arguments: {0} = event
+ */
+ SPIKE_EVENT_PUBLISHED,
+ /**
+ * Event failed to publish: {0}
+ *
+ * Arguments: {0} = event
+ */
+ SPIKE_PUBLISH_FAILED,
+ /**
+ * No Event Received
+ *
+ * Arguments: none
+ */
+ SPIKE_NO_EVENT_RECEIVED,
+ /**
+ * Checking for events Arguments: none
+ */
+ SPIKE_QUERY_EVENT_SYSTEM,
+
+ /**
+ * Schema Ingest properties file was not loaded properly
+ */
+ SPIKE_SCHEMA_INGEST_LOAD_ERROR,
+
+ /**
+ * Received request {0} {1} from {2}. Sending response: {3} Arguments: {0} = operation {1} = target
+ * URL {2} = source {3} = response code
+ */
+ PROCESS_REST_REQUEST;
+
+ /**
+ * Static initializer to ensure the resource bundles for this class are loaded...
+ */
+ static {
+ EELFResourceManager.loadMessageBundle("logging/SpikeMsgs");
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java b/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java
new file mode 100644
index 0000000..b914421
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java
@@ -0,0 +1,229 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import com.google.common.collect.Multimap;
+import org.apache.commons.io.IOUtils;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.edges.EdgeIngestor;
+import org.onap.aai.edges.EdgeRule;
+import org.onap.aai.edges.exceptions.EdgeRuleNotFoundException;
+import org.onap.aai.setup.ConfigTranslator;
+import org.onap.aai.setup.SchemaLocationsBean;
+import org.onap.aai.setup.Version;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SchemaIngestPropertiesReader;
+
+
+public class EdgeRulesLoader {
+
+ private static Map<String, RelationshipSchema> versionContextMap = new ConcurrentHashMap<>();
+
+ static final Pattern versionPattern = Pattern.compile("V(\\d*)");
+ static final String propsPrefix = "edge_properties_";
+ static final String propsSuffix = ".json";
+ final static Pattern propsFilePattern = Pattern.compile(propsPrefix + "(.*)" + propsSuffix);
+ final static Pattern propsVersionPattern = Pattern.compile("v\\d*");
+
+ private static org.onap.aai.cl.api.Logger logger =
+ LoggerFactory.getInstance().getLogger(EdgeRulesLoader.class.getName());
+
+ private EdgeRulesLoader() {}
+
+ /**
+ * Finds all DB Edge Rules and Edge Properties files for all OXM models.
+ *
+ * @throws SpikeException
+ */
+ public static synchronized void loadModels() throws SpikeException {
+ SchemaIngestPropertiesReader SchemaIngestPropertiesReader = new SchemaIngestPropertiesReader();
+ SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean();
+ schemaLocationsBean.setEdgeDirectory(SchemaIngestPropertiesReader.getEdgeDir());
+ ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean);
+ EdgeIngestor edgeIngestor = new EdgeIngestor(configTranslator);
+ Map<String, File> propFiles = edgePropertyFiles(SchemaIngestPropertiesReader);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading DB Edge Rules");
+ }
+
+ for (String version : OXMModelLoader.getLoadedOXMVersions()) {
+ try {
+ loadModel(Version.valueOf(version), edgeIngestor, propFiles);
+ } catch (IOException | EdgeRuleNotFoundException e) {
+ throw new SpikeException(e.getMessage(), e);
+ }
+ }
+ }
+
+ /**
+ * Loads DB Edge Rules and Edge Properties for a given version.
+ *
+ * @throws SpikeException
+ */
+
+ public static synchronized void loadModels(String v) throws SpikeException {
+ SchemaIngestPropertiesReader SchemaIngestPropertiesReader = new SchemaIngestPropertiesReader();
+ SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean();
+ schemaLocationsBean.setEdgeDirectory(SchemaIngestPropertiesReader.getEdgeDir());
+ ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean);
+ EdgeIngestor edgeIngestor = new EdgeIngestor(configTranslator);
+ String version = v.toUpperCase();
+ Map<String, File> propFiles = edgePropertyFiles(SchemaIngestPropertiesReader);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading DB Edge Rules ");
+ }
+
+ try {
+ loadModel(Version.valueOf(version), edgeIngestor, propFiles);
+ } catch (IOException | EdgeRuleNotFoundException e) {
+ throw new SpikeException(e.getMessage());
+ }
+ }
+
+ /**
+ * Retrieves the DB Edge Rule relationship schema for a given version.
+ *
+ * @param version - The OXM version that we want the DB Edge Rule for.
+ * @return - A RelationshipSchema of the DB Edge Rule for the OXM version.
+ * @throws SpikeException
+ */
+ public static RelationshipSchema getSchemaForVersion(String version) throws SpikeException {
+
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ } else if (!versionContextMap.containsKey(version)) {
+ logger.error(SpikeMsgs.OXM_LOAD_ERROR, "Error loading DB Edge Rules for: " + version);
+ throw new SpikeException("Error loading DB Edge Rules for: " + version);
+ }
+
+ return versionContextMap.get(version);
+ }
+
+ /**
+ * Retrieves the DB Edge Rule relationship schema for all loaded OXM versions.
+ *
+ * @return - A Map of the OXM version and it's corresponding RelationshipSchema of the DB Edge Rule.
+ * @throws SpikeException
+ */
+ public static Map<String, RelationshipSchema> getSchemas() throws SpikeException {
+
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ }
+ return versionContextMap;
+ }
+
+ /**
+ * Returns the latest available DB Edge Rule version.
+ *
+ * @return - A Map of the OXM version and it's corresponding RelationshipSchema of the DB Edge Rule.
+ * @throws SpikeException
+ */
+ public static String getLatestSchemaVersion() throws SpikeException {
+
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ }
+
+ // If there are still no models available, then there's not much we can do...
+ if (versionContextMap.isEmpty()) {
+ logger.error(SpikeMsgs.OXM_LOAD_ERROR, "No available DB Edge Rules to get latest version for.");
+ throw new SpikeException("No available DB Edge Rules to get latest version for.");
+ }
+
+ // Iterate over the available model versions to determine which is the most
+ // recent.
+ Integer latestVersion = null;
+ String latestVersionStr = null;
+ for (String versionKey : versionContextMap.keySet()) {
+
+ Matcher matcher = versionPattern.matcher(versionKey.toUpperCase());
+ if (matcher.find()) {
+
+ int currentVersion = Integer.parseInt(matcher.group(1));
+
+ if ((latestVersion == null) || (currentVersion > latestVersion)) {
+ latestVersion = currentVersion;
+ latestVersionStr = versionKey;
+ }
+ }
+ }
+
+ return latestVersionStr;
+ }
+
+ /**
+ * Reset the loaded DB Edge Rule schemas
+ *
+ */
+
+ public static void resetSchemaVersionContext() {
+ versionContextMap = new ConcurrentHashMap<>();
+ }
+
+ private static synchronized void loadModel(Version version, EdgeIngestor edgeIngestor, Map<String, File> props)
+ throws IOException, SpikeException, EdgeRuleNotFoundException {
+
+ Multimap<String, EdgeRule> edges = edgeIngestor.getAllRules(version);
+ String edgeProps;
+ if (props.get(version.toString().toLowerCase()) != null) {
+ edgeProps = IOUtils.toString(new FileInputStream(props.get(version.toString().toLowerCase())), "UTF-8");
+ } else {
+ throw new FileNotFoundException("The Edge Properties file for OXM version " + version + "was not found.");
+ }
+ if (edges != null) {
+ RelationshipSchema rs = new RelationshipSchema(edges, edgeProps);
+ versionContextMap.put(version.toString().toLowerCase(), rs);
+ logger.info(SpikeMsgs.LOADED_DB_RULE_FILE, version.toString());
+ }
+ }
+
+ private static Map<String, File> edgePropertyFiles(SchemaIngestPropertiesReader dir) throws SpikeException {
+ Map<String, File> propsFiles = Arrays
+ .stream(new File(dir.getEdgePropsDir())
+ .listFiles((d, name) -> propsFilePattern.matcher(name).matches()))
+ .collect(Collectors.toMap(new Function<File, String>() {
+ public String apply(File f) {
+ Matcher m1 = propsVersionPattern.matcher(f.getName());
+ m1.find();
+ return m1.group(0);
+ }
+ }, f -> f));
+ return propsFiles;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java b/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java
new file mode 100644
index 0000000..ced84bb
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java
@@ -0,0 +1,282 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.base.CaseFormat;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import org.eclipse.persistence.dynamic.DynamicType;
+import org.eclipse.persistence.internal.helper.DatabaseField;
+import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.eclipse.persistence.mappings.DatabaseMapping;
+import org.eclipse.persistence.oxm.XMLField;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.spike.event.incoming.GizmoEdge;
+import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
+import org.onap.aai.spike.event.incoming.GizmoVertex;
+import org.onap.aai.spike.exception.SpikeException;
+
+/**
+ * This class is responsible for transforming raw graph entities (such as vertices and edges) into
+ * representations which correspond to the OXM models.
+ */
+public class GraphEventTransformer {
+
+ private static org.onap.aai.cl.api.Logger logger =
+ LoggerFactory.getInstance().getLogger(GraphEventTransformer.class.getName());
+ private static final String AAI_UUID = "aai-uuid";
+
+ /**
+ *
+ * @param rawVertex
+ * @throws SpikeException
+ */
+ public static void validateVertexModel(GizmoVertex rawVertex) throws SpikeException {
+
+ validateVertexModel(OXMModelLoader.getLatestVersion(), rawVertex);
+ }
+
+ public static void populateUUID(GizmoGraphEvent event) throws SpikeException {
+ try {
+ if (event.getVertex() != null) {
+ if (event.getVertex().getProperties().getAsJsonObject().has(AAI_UUID)) {
+ event.getVertex()
+ .setId(event.getVertex().getProperties().getAsJsonObject().get(AAI_UUID).getAsString());
+ }
+ } else if (event.getRelationship() != null) {
+ if (event.getRelationship().getProperties().getAsJsonObject().has(AAI_UUID)) {
+ event.getRelationship().setId(
+ event.getRelationship().getProperties().getAsJsonObject().get(AAI_UUID).getAsString());
+ }
+
+ if (event.getRelationship().getSource().getProperties().getAsJsonObject().has(AAI_UUID)) {
+ event.getRelationship().getSource().setId(event.getRelationship().getSource().getProperties()
+ .getAsJsonObject().get(AAI_UUID).getAsString());
+ }
+ if (event.getRelationship().getTarget().getProperties().getAsJsonObject().has(AAI_UUID)) {
+ event.getRelationship().getTarget().setId(event.getRelationship().getTarget().getProperties()
+ .getAsJsonObject().get(AAI_UUID).getAsString());
+ }
+ }
+ } catch (Exception ex) {
+ throw new SpikeException("Unable to parse uuid in incoming event");
+ }
+ }
+
+ /**
+ *
+ * @param version
+ * @param rawVertex
+ * @throws SpikeException
+ */
+ public static void validateVertexModel(String version, GizmoVertex rawVertex) throws SpikeException {
+
+ try {
+
+ DynamicJAXBContext jaxbContext = OXMModelLoader.getContextForVersion(version);
+ String modelObjectClass = CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL,
+ CaseFormat.LOWER_HYPHEN.to(CaseFormat.UPPER_CAMEL, rawVertex.getType()));
+ final DynamicType modelObjectType = jaxbContext.getDynamicType(modelObjectClass);
+ final DynamicType reservedType = jaxbContext.getDynamicType("ReservedPropNames");
+
+ Set<Map.Entry<String, JsonElement>> vertexEntriesSet =
+ rawVertex.getProperties().getAsJsonObject().entrySet();
+ Map<String, JsonElement> vertexEntriesMap = new HashMap<String, JsonElement>();
+ for (Map.Entry<String, JsonElement> entry : vertexEntriesSet) {
+ vertexEntriesMap.put(entry.getKey(), entry.getValue());
+ }
+
+ JsonObject modelJsonElement = new JsonObject();
+ // Iterate over all of the attributes specified in the model schema,
+ // populating
+ // our dynamic instance with the corresponding values supplied in
+ // our raw vertex.
+ for (DatabaseMapping mapping : modelObjectType.getDescriptor().getMappings()) {
+ if (mapping.isAbstractDirectMapping()) {
+ DatabaseField f = mapping.getField();
+ String keyName = f.getName().substring(0, f.getName().indexOf("/"));
+
+ String defaultValue = mapping.getProperties().get("defaultValue") == null ? ""
+ : mapping.getProperties().get("defaultValue").toString();
+
+ if (((XMLField) f).isRequired() && !vertexEntriesMap.containsKey(keyName)
+ && !defaultValue.isEmpty()) {
+ modelJsonElement.addProperty(keyName, defaultValue);
+
+ }
+ // If this is a required field, but is not present in the
+ // raw vertex, reject this
+ // as an invalid input since we can't build a valid object
+ // from what we were provided.
+ if (((XMLField) f).isRequired() && !vertexEntriesMap.containsKey(keyName)
+ && defaultValue.isEmpty()) {
+ throw new SpikeException("Missing required field: " + keyName);
+ }
+
+ // If this is a non-required field, then set it if a value
+ // was provided in the
+ // raw vertex.
+ if (vertexEntriesMap.containsKey(keyName)) {
+ validateFieldType(vertexEntriesMap.get(keyName), f.getType());
+ modelJsonElement.add(keyName, vertexEntriesMap.get(keyName));
+ }
+ }
+ }
+
+ // Ensure any of the reserved properties are added to the payload
+ for (DatabaseMapping mapping : reservedType.getDescriptor().getMappings()) {
+ if (mapping.isAbstractDirectMapping()) {
+ DatabaseField field = mapping.getField();
+ String keyName = field.getName().substring(0, field.getName().indexOf("/"));
+
+ if (vertexEntriesMap.containsKey(keyName)) {
+ validateFieldType(vertexEntriesMap.get(keyName), field.getType());
+ modelJsonElement.add(keyName, vertexEntriesMap.get(keyName));
+ }
+ }
+ }
+
+ rawVertex.setProperties(modelJsonElement);
+ } catch (Exception e) {
+ throw new SpikeException(e.getMessage());
+ }
+ }
+
+ /**
+ *
+ * @param rawEdge
+ * @throws SpikeException
+ */
+ public static void validateEdgeModel(GizmoEdge rawEdge) throws SpikeException {
+
+ validateEdgeModel(EdgeRulesLoader.getLatestSchemaVersion(), rawEdge);
+ }
+
+ /**
+ *
+ * @param version
+ * @param rawEdge
+ * @throws SpikeException
+ */
+ public static void validateEdgeModel(String version, GizmoEdge rawEdge) throws SpikeException {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Convert edge: " + rawEdge.toString() + " to model version: " + version);
+ }
+
+ // Get the relationship schema for the supplied version.
+ RelationshipSchema schema = EdgeRulesLoader.getSchemaForVersion(version);
+
+ try {
+
+ // Validate that our edge does have the necessary endpoints.
+ if (rawEdge.getSource() == null || rawEdge.getTarget() == null) {
+ throw new SpikeException("Source or target endpoint not specified");
+ }
+
+ // Create a key based on source:target:relationshipType
+ String sourceNodeType = rawEdge.getSource().getType();
+ String targetNodeType = rawEdge.getTarget().getType();
+ String key = sourceNodeType + ":" + targetNodeType + ":" + rawEdge.getType();
+
+ // Now, look up the specific schema model based on the key we just
+ // constructed.
+ Map<String, Class<?>> relationshipModel = schema.lookupRelation(key);
+ if (relationshipModel == null || relationshipModel.isEmpty()) {
+ throw new SpikeException("Invalid source/target/relationship type: " + key);
+ }
+
+ Set<Map.Entry<String, JsonElement>> edgeEntriesSet = rawEdge.getProperties().getAsJsonObject().entrySet();
+ Map<String, JsonElement> edgeEntriesMap = new HashMap<String, JsonElement>();
+ for (Map.Entry<String, JsonElement> entry : edgeEntriesSet) {
+ edgeEntriesMap.put(entry.getKey(), entry.getValue());
+ }
+
+ JsonObject modelJsonElement = new JsonObject();
+
+ for (String property : relationshipModel.keySet()) {
+
+ if (!edgeEntriesMap.containsKey(property)) {
+ throw new SpikeException("Missing required field: " + property);
+ }
+
+ validateFieldType(edgeEntriesMap.get(property), relationshipModel.get(property));
+ modelJsonElement.add(property, edgeEntriesMap.get(property));
+
+ }
+
+ rawEdge.setProperties(modelJsonElement);
+
+
+ } catch (Exception ex) {
+ throw new SpikeException(ex.getMessage());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public static Object validateFieldType(JsonElement value, Class clazz) throws SpikeException {
+ try {
+ if (clazz.isAssignableFrom(Integer.class)) {
+ return value.getAsInt();
+ } else if (clazz.isAssignableFrom(Long.class)) {
+ return value.getAsLong();
+ } else if (clazz.isAssignableFrom(Float.class)) {
+ return value.getAsFloat();
+ } else if (clazz.isAssignableFrom(Double.class)) {
+ return value.getAsDouble();
+ } else if (clazz.isAssignableFrom(Boolean.class)) {
+ return value.getAsBoolean();
+ } else {
+ return value;
+ }
+
+ } catch (Exception e) {
+ throw new SpikeException("Invalid property value: " + value);
+ }
+ }
+
+ public static Object validateFieldType(String value, Class clazz) throws SpikeException {
+ try {
+ if (clazz.isAssignableFrom(Integer.class)) {
+ return Integer.parseInt(value);
+ } else if (clazz.isAssignableFrom(Long.class)) {
+ return Long.parseLong(value);
+ } else if (clazz.isAssignableFrom(Float.class)) {
+ return Float.parseFloat(value);
+ } else if (clazz.isAssignableFrom(Double.class)) {
+ return Double.parseDouble(value);
+ } else if (clazz.isAssignableFrom(Boolean.class)) {
+ if (!value.equals("true") && !value.equals("false")) {
+ throw new SpikeException("Invalid property value: " + value);
+ }
+ return Boolean.parseBoolean(value);
+ } else {
+ return value;
+ }
+ } catch (Exception e) {
+ throw new SpikeException("Invalid property value: " + value);
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/MapAdapter.java b/src/main/java/org/onap/aai/spike/schema/MapAdapter.java
new file mode 100644
index 0000000..2e59450
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/MapAdapter.java
@@ -0,0 +1,65 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import javax.naming.OperationNotSupportedException;
+import javax.xml.bind.JAXBElement;
+import javax.xml.bind.annotation.XmlAnyElement;
+import javax.xml.bind.annotation.adapters.XmlAdapter;
+import javax.xml.namespace.QName;
+
+
+public class MapAdapter extends XmlAdapter<MapAdapter.AdaptedMap, Map<String, Object>> {
+
+ public static class AdaptedMap {
+ @XmlAnyElement
+ List elements;
+ }
+
+ @Override
+ public Map<String, Object> unmarshal(AdaptedMap map) throws Exception {
+ throw new OperationNotSupportedException(); // really??
+ }
+
+ @Override
+ public AdaptedMap marshal(Map<String, Object> map) throws Exception {
+
+ AdaptedMap adaptedMap = new AdaptedMap();
+ List elements = new ArrayList();
+ for (Map.Entry<String, Object> property : map.entrySet()) {
+
+ if (property.getValue() instanceof Map) {
+ elements.add(new JAXBElement<AdaptedMap>(new QName(property.getKey()), MapAdapter.AdaptedMap.class,
+ marshal((Map) property.getValue())));
+
+ } else {
+
+ elements.add(new JAXBElement<String>(new QName(property.getKey()), String.class,
+ property.getValue().toString()));
+ }
+ }
+ adaptedMap.elements = elements;
+ return adaptedMap;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java b/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java
new file mode 100644
index 0000000..0d174db
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java
@@ -0,0 +1,187 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.nodes.NodeIngestor;
+import org.onap.aai.setup.ConfigTranslator;
+import org.onap.aai.setup.SchemaLocationsBean;
+import org.onap.aai.setup.Version;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SchemaIngestPropertiesReader;
+
+/**
+ * This class contains all of the logic for importing OXM model schemas from the available OXM
+ * schema files.
+ */
+public class OXMModelLoader {
+
+ private static Map<String, DynamicJAXBContext> versionContextMap =
+ new ConcurrentHashMap<String, DynamicJAXBContext>();
+
+ final static Pattern p = Pattern.compile("aai_oxm_(.*).xml");
+ final static Pattern versionPattern = Pattern.compile("V(\\d*)");
+
+ private static org.onap.aai.cl.api.Logger logger =
+ LoggerFactory.getInstance().getLogger(OXMModelLoader.class.getName());
+
+ /**
+ * Finds all OXM model files
+ *
+ * @throws SpikeException
+ * @throws IOException
+ *
+ */
+ public synchronized static void loadModels() throws SpikeException {
+ SchemaIngestPropertiesReader schemaIngestPropReader = new SchemaIngestPropertiesReader();
+ SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean();
+ schemaLocationsBean.setNodeDirectory(schemaIngestPropReader.getNodeDir());
+ schemaLocationsBean.setEdgeDirectory(schemaIngestPropReader.getEdgeDir());
+ ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean);
+ NodeIngestor nodeIngestor = new NodeIngestor(configTranslator);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Loading OXM Models");
+ }
+
+ for (Version oxmVersion : Version.values()) {
+ DynamicJAXBContext jaxbContext = nodeIngestor.getContextForVersion(oxmVersion);
+ if (jaxbContext != null) {
+ loadModel(oxmVersion.toString(), jaxbContext);
+ }
+ }
+ }
+
+
+ private synchronized static void loadModel(String oxmVersion, DynamicJAXBContext jaxbContext) {
+ versionContextMap.put(oxmVersion, jaxbContext);
+ logger.info(SpikeMsgs.LOADED_OXM_FILE, oxmVersion);
+ }
+
+ /**
+ * Retrieves the JAXB context for the specified OXM model version.
+ *
+ * @param version - The OXM version that we want the JAXB context for.
+ *
+ * @return - A JAXB context derived from the OXM model schema.
+ *
+ * @throws SpikeException
+ */
+ public static DynamicJAXBContext getContextForVersion(String version) throws SpikeException {
+
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ } else if (!versionContextMap.containsKey(version)) {
+ throw new SpikeException("Error loading oxm model: " + version);
+ }
+
+ return versionContextMap.get(version);
+ }
+
+ public static String getLatestVersion() throws SpikeException {
+
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ }
+
+ // If there are still no models available, then there's not much we can do...
+ if (versionContextMap.isEmpty()) {
+ throw new SpikeException("No available OXM schemas to get latest version for.");
+ }
+
+ // Iterate over the available model versions to determine which is the most
+ // recent.
+ Integer latestVersion = null;
+ String latestVersionStr = null;
+ for (String versionKey : versionContextMap.keySet()) {
+
+ Matcher matcher = versionPattern.matcher(versionKey);
+ if (matcher.find()) {
+
+ int currentVersion = Integer.valueOf(matcher.group(1));
+
+ if ((latestVersion == null) || (currentVersion > latestVersion)) {
+ latestVersion = currentVersion;
+ latestVersionStr = versionKey;
+ }
+ }
+ }
+
+ return latestVersionStr;
+ }
+
+ /**
+ * Retrieves the map of all JAXB context objects that have been created by importing the available
+ * OXM model schemas.
+ *
+ * @return - Map of JAXB context objects.
+ */
+ public static Map<String, DynamicJAXBContext> getVersionContextMap() {
+ return versionContextMap;
+ }
+
+ /**
+ * Assigns the map of all JAXB context objects.
+ *
+ * @param versionContextMap
+ */
+ public static void setVersionContextMap(Map<String, DynamicJAXBContext> versionContextMap) {
+ OXMModelLoader.versionContextMap = versionContextMap;
+ }
+
+ /**
+ * Retrieves the list of all Loaded OXM versions.
+ *
+ * @return - A List of Strings of all loaded OXM versions.
+ *
+ * @throws SpikeException
+ */
+ public static List<String> getLoadedOXMVersions() throws SpikeException {
+ // If we haven't already loaded in the available OXM models, then do so now.
+ if (versionContextMap == null || versionContextMap.isEmpty()) {
+ loadModels();
+ }
+ // If there are still no models available, then there's not much we can do...
+ if (versionContextMap.isEmpty()) {
+ logger.error(SpikeMsgs.OXM_LOAD_ERROR, "No available OXM schemas to get versions for.");
+ throw new SpikeException("No available OXM schemas to get latest version for.");
+ }
+ List<String> versions = new ArrayList<String>();
+ for (String versionKey : versionContextMap.keySet()) {
+ Matcher matcher = versionPattern.matcher(versionKey.toUpperCase());
+ if (matcher.find()) {
+ versions.add("V" + matcher.group(1));
+ }
+ }
+ return versions;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java b/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java
new file mode 100644
index 0000000..51ed93e
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java
@@ -0,0 +1,98 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.PathMatcher;
+import java.nio.file.Paths;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceConfigurationError;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.onap.aai.setup.ConfigTranslator;
+import org.onap.aai.setup.SchemaLocationsBean;
+import org.onap.aai.setup.Version;
+
+public class OxmConfigTranslator extends ConfigTranslator {
+ public OxmConfigTranslator(SchemaLocationsBean bean) {
+ super(bean);
+ }
+
+ @Override
+ public Map<Version, List<String>> getNodeFiles() {
+ String nodeDirectory = bean.getNodeDirectory();
+ if (nodeDirectory == null) {
+ throw new ServiceConfigurationError(
+ "Node(s) directory is empty in the schema location bean (" + bean.getSchemaConfigLocation() + ")");
+ }
+ try {
+ return getVersionMap(Paths.get(nodeDirectory), "*_v*.xml");
+ } catch (IOException e) {
+ throw new ServiceConfigurationError("Failed to read node(s) directory " + getPath(nodeDirectory), e);
+ }
+ }
+
+ @Override
+ public Map<Version, List<String>> getEdgeFiles() {
+ String edgeDirectory = bean.getEdgeDirectory();
+ if (edgeDirectory == null) {
+ throw new ServiceConfigurationError(
+ "Edge(s) directory is empty in the schema location bean (" + bean.getSchemaConfigLocation() + ")");
+ }
+ try {
+ return getVersionMap(Paths.get(edgeDirectory), "*_v*.json");
+ } catch (IOException e) {
+ throw new ServiceConfigurationError("Failed to read edge(s) directory " + getPath(edgeDirectory), e);
+ }
+ }
+
+ private String getPath(String nodeDirectory) {
+ return Paths.get(nodeDirectory).toAbsolutePath().toString();
+ }
+
+ /**
+ * Creates a map containing each OXM Version and the matching OXM file path(s)
+ *
+ * @param folderPath the folder/directory containing the OXM files
+ * @param fileSuffix
+ * @return a new Map object (may be empty)
+ * @throws IOException if there is a problem reading the specified directory path
+ */
+ private Map<Version, List<String>> getVersionMap(Path folderPath, String globPattern) throws IOException {
+ final PathMatcher filter = folderPath.getFileSystem().getPathMatcher("glob:**/" + globPattern);
+ try (final Stream<Path> stream = Files.list(folderPath)) {
+ return stream.filter(filter::matches).map(Path::toString).filter(p -> getVersionFromPath(p) != null)
+ .collect(Collectors.groupingBy(this::getVersionFromPath));
+ }
+ }
+
+ private Version getVersionFromPath(String pathName) {
+ String version = "V" + pathName.replaceAll("^.*\\/", "").replaceAll("\\D+", "");
+ try {
+ return Version.valueOf(version);
+ } catch (IllegalArgumentException e) {
+ return null;
+ }
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/Relationship.java b/src/main/java/org/onap/aai/spike/schema/Relationship.java
new file mode 100644
index 0000000..af81a29
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/Relationship.java
@@ -0,0 +1,102 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.schema;
+
+import java.util.HashMap;
+import java.util.Map;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import com.google.gson.annotations.SerializedName;
+
+
+/**
+ * This class represents a relationship instance that can be used for marshalling and unmarshalling
+ * to/from a model compliant representation.
+ */
+@XmlRootElement
+public class Relationship {
+
+ /** Unique identifier for this relationship. */
+ private String id;
+
+ /** Relationship type. */
+ private String type;
+
+ /** The source vertex for this edge. */
+ @SerializedName("source-node-type")
+ private String sourceNodeType;
+
+ /** The target vertex for this edge. */
+ @SerializedName("target-node-type")
+ private String targetNodeType;
+
+ /** The properties assigned to this edge. */
+ private Map<String, Object> properties = new HashMap<String, Object>();
+
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getSourceNodeType() {
+ return sourceNodeType;
+ }
+
+ @XmlElement(name = "source-node-type")
+ public void setSourceNodeType(String sourceNodeType) {
+ this.sourceNodeType = sourceNodeType;
+ }
+
+ public String getTargetNodeType() {
+ return targetNodeType;
+ }
+
+ @XmlElement(name = "target-node-type")
+ public void setTargetNodeType(String targetNodeType) {
+ this.targetNodeType = targetNodeType;
+ }
+
+ @XmlJavaTypeAdapter(MapAdapter.class)
+ public Map<String, Object> getProperties() {
+ return properties;
+ }
+
+ public void setProperties(Map<String, Object> properties) {
+ this.properties = properties;
+ }
+
+ public void setProperty(String key, Object property) {
+ properties.put(key, property);
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java b/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java
new file mode 100644
index 0000000..6858783
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java
@@ -0,0 +1,133 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.aai.spike.schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import com.google.common.collect.Multimap;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.onap.aai.edges.EdgeRule;
+import org.onap.aai.spike.exception.SpikeException;
+
+
+public class RelationshipSchema {
+ private static final Gson gson = new GsonBuilder().create();
+
+ public static final String SCHEMA_SOURCE_NODE_TYPE = "from";
+ public static final String SCHEMA_TARGET_NODE_TYPE = "to";
+ public static final String SCHEMA_RELATIONSHIP_TYPE = "label";
+ public static final String SCHEMA_RULES_ARRAY = "rules";
+
+
+ private Map<String, Map<String, Class<?>>> relations = new HashMap<>();
+ /**
+ * Hashmap of valid relationship types along with properties.
+ */
+ private Map<String, Map<String, Class<?>>> relationTypes = new HashMap<>();
+
+
+ public RelationshipSchema(Multimap<String, EdgeRule> rules, String props) throws SpikeException, IOException {
+ HashMap<String, String> properties = new ObjectMapper().readValue(props, HashMap.class);
+ Map<String, Class<?>> edgeProps =
+ properties.entrySet().stream().collect(Collectors.toMap(p -> p.getKey(), p -> {
+ try {
+ return resolveClass(p.getValue());
+ } catch (SpikeException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }));
+
+ rules.entries().forEach((kv) -> {
+ relationTypes.put(kv.getValue().getLabel(), edgeProps);
+ relations.put(buildRelation(kv.getValue().getFrom(), kv.getValue().getTo(), kv.getValue().getLabel()),
+ edgeProps);
+ });
+ }
+
+ public RelationshipSchema(List<String> jsonStrings) throws SpikeException, IOException {
+ String edgeRules = jsonStrings.get(0);
+ String props = jsonStrings.get(1);
+
+ HashMap<String, ArrayList<LinkedHashMap<String, String>>> rules =
+ new ObjectMapper().readValue(edgeRules, HashMap.class);
+ HashMap<String, String> properties = new ObjectMapper().readValue(props, HashMap.class);
+ Map<String, Class<?>> edgeProps =
+ properties.entrySet().stream().collect(Collectors.toMap(p -> p.getKey(), p -> {
+ try {
+ return resolveClass(p.getValue());
+ } catch (SpikeException | ClassNotFoundException e) {
+ e.printStackTrace();
+ }
+ return null;
+ }));
+
+ rules.get(SCHEMA_RULES_ARRAY).forEach(l -> {
+ relationTypes.put(l.get(SCHEMA_RELATIONSHIP_TYPE), edgeProps);
+ relations.put(buildRelation(l.get(SCHEMA_SOURCE_NODE_TYPE), l.get(SCHEMA_TARGET_NODE_TYPE),
+ l.get(SCHEMA_RELATIONSHIP_TYPE)), edgeProps);
+ });
+ }
+
+
+
+ public Map<String, Class<?>> lookupRelation(String key) {
+ return this.relations.get(key);
+ }
+
+ public Map<String, Class<?>> lookupRelationType(String type) {
+ return this.relationTypes.get(type);
+ }
+
+ public boolean isValidType(String type) {
+ return relationTypes.containsKey(type);
+ }
+
+
+ private String buildRelation(String source, String target, String relation) {
+ return source + ":" + target + ":" + relation;
+ }
+
+ private Class<?> resolveClass(String type) throws SpikeException, ClassNotFoundException {
+ Class<?> clazz = Class.forName(type);
+ validateClassTypes(clazz);
+ return clazz;
+ }
+
+ private void validateClassTypes(Class<?> clazz) throws SpikeException {
+ if (!clazz.isAssignableFrom(Integer.class) && !clazz.isAssignableFrom(Double.class)
+ && !clazz.isAssignableFrom(Boolean.class) && !clazz.isAssignableFrom(String.class)) {
+ throw new SpikeException("BAD_REQUEST");
+ }
+ }
+}
+
+
diff --git a/src/main/java/org/onap/aai/spike/service/EchoService.java b/src/main/java/org/onap/aai/spike/service/EchoService.java
new file mode 100644
index 0000000..6c5b5ee
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/EchoService.java
@@ -0,0 +1,85 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Spike
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+package org.onap.aai.spike.service;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.Response.Status;
+import org.onap.aai.cl.api.LogFields;
+import org.onap.aai.cl.api.LogLine;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.slf4j.MDC;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestHeader;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@RequestMapping(value = "/services/spike/v1/echo-service")
+public class EchoService {
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(EchoService.class.getName());
+ private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EchoService.class.getName());
+
+ @GetMapping("/echo")
+ public ResponseEntity<String> ping(@RequestHeader HttpHeaders headers, HttpServletRequest req) {
+
+ String fromIp = req.getRemoteAddr();
+ String fromAppId = "";
+ String transId = null;
+
+ if (headers.getFirst("X-FromAppId") != null) {
+ fromAppId = headers.getFirst("X-FromAppId");
+ }
+
+ if ((headers.getFirst("X-TransactionId") == null) || headers.getFirst("X-TransactionId").isEmpty()) {
+ transId = java.util.UUID.randomUUID().toString();
+ } else {
+ transId = headers.getFirst("X-TransactionId");
+ }
+
+ MdcContext.initialize(transId, SpikeConstants.SPIKE_SERVICE_NAME, "", fromAppId, fromIp);
+
+ // Generate error log
+ logger.info(SpikeMsgs.PROCESS_REST_REQUEST, req.getMethod(), req.getRequestURL().toString(),
+ req.getRemoteHost(), Status.OK.toString());
+
+ // Generate audit log.
+ auditLogger.info(SpikeMsgs.PROCESS_REST_REQUEST,
+ new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, Status.OK.toString())
+ .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, Status.OK.toString()),
+ (req != null) ? req.getMethod() : "Unknown", (req != null) ? req.getRequestURL().toString() : "Unknown",
+ (req != null) ? req.getRemoteHost() : "Unknown", Status.OK.toString());
+ MDC.clear();
+
+ return new ResponseEntity<>("Alive", HttpStatus.OK);
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
new file mode 100644
index 0000000..c4e1746
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java
@@ -0,0 +1,298 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.service;
+
+import java.util.ArrayList;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.PriorityBlockingQueue;
+import javax.naming.OperationNotSupportedException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+import org.onap.aai.event.api.MessageWithOffset;
+import org.onap.aai.spike.event.envelope.EventEnvelope;
+import org.onap.aai.spike.event.envelope.EventEnvelopeParser;
+import org.onap.aai.spike.event.incoming.GizmoGraphEvent;
+import org.onap.aai.spike.event.incoming.OffsetManager;
+import org.onap.aai.spike.event.outgoing.SpikeEventComparator;
+import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy;
+import org.onap.aai.spike.event.outgoing.SpikeGraphEvent;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.onap.aai.spike.util.SpikeProperties;
+
+public class SpikeEventProcessor extends TimerTask {
+
+ /**
+ * Client used for consuming events to the event bus.
+ */
+ private EventConsumer consumer;
+ /**
+ * Client used for publishing events to the event bus.
+ */
+ private EventPublisher publisher;
+ /**
+ * Internal queue where outgoing events will be buffered until they can be serviced by the event
+ * publisher worker threads.
+ */
+ private BlockingQueue<SpikeGraphEvent> eventQueue;
+
+ private Integer eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY;
+ private Integer eventOffsetPeriod = DEFAULT_EVENT_OFFSET_COMMIT_PERIOD;
+
+ private OffsetManager offsetManager;
+ private Long lastCommittedOffset = null;
+ private EventEnvelopeParser eventEnvelopeParser;
+
+ /**
+ * Number of events that can be queued up for publishing before it is dropped
+ */
+ private static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000;
+ private static final Integer DEFAULT_EVENT_OFFSET_COMMIT_PERIOD = 10000;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeEventProcessor.class.getName());
+ private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(SpikeEventProcessor.class.getName());
+ private static final Gson gson =
+ new GsonBuilder().setExclusionStrategies(new SpikeEventExclusionStrategy()).setPrettyPrinting().create();
+
+ public SpikeEventProcessor(EventConsumer consumer, EventPublisher publisher) {
+ this.consumer = consumer;
+ this.publisher = publisher;
+
+ try {
+ eventQueueCapacity = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_CAPACITY));
+ eventOffsetPeriod = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_OFFSET_CHECK_PERIOD));
+
+ } catch (Exception ex) {
+ }
+
+ eventQueue = new PriorityBlockingQueue<SpikeGraphEvent>(eventQueueCapacity, new SpikeEventComparator());
+ new Thread(new SpikeEventPublisher()).start();
+
+ // Instantiate the offset manager. This will run a background thread that
+ // periodically updates the value of the most recent offset value that can
+ // be safely committed with the event bus.
+ offsetManager = new OffsetManager(eventQueueCapacity, eventOffsetPeriod);
+ eventEnvelopeParser = new EventEnvelopeParser();
+ }
+
+ @Override
+ public void run() {
+ logger.info(SpikeMsgs.SPIKE_QUERY_EVENT_SYSTEM);
+
+ if (consumer == null) {
+ logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME);
+ }
+
+ Iterable<MessageWithOffset> events = null;
+ try {
+ events = consumer.consumeWithOffsets();
+
+ } catch (OperationNotSupportedException e) {
+ // This means we are using DMaaP and can't use offsets
+ try {
+ Iterable<String> tempEvents = consumer.consume();
+ ArrayList<MessageWithOffset> messages = new ArrayList<MessageWithOffset>();
+ for (String event : tempEvents) {
+ messages.add(new MessageWithOffset(0, event));
+ }
+ events = messages;
+ } catch (Exception e1) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e1.getMessage());
+ return;
+ }
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
+ return;
+ }
+
+ if (events == null || !events.iterator().hasNext()) {
+ logger.info(SpikeMsgs.SPIKE_NO_EVENT_RECEIVED);
+ }
+
+ for (MessageWithOffset event : events) {
+ try {
+ logger.debug(SpikeMsgs.SPIKE_EVENT_RECEIVED, event.getMessage());
+
+ GizmoGraphEvent modelEvent = eventEnvelopeParser.parseEvent(event.getMessage());
+ auditLogger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED,
+ "of type: " + modelEvent.getObjectType() + " with key: " + modelEvent.getObjectKey()
+ + " , transaction-id: " + modelEvent.getTransactionId());
+ logger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, "of type: " + modelEvent.getObjectType() + " with key: "
+ + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
+
+ String modelEventJson = gson.toJson(modelEvent);
+
+ // Log the current event as 'being processed' with the offset manager so that we know that it's
+ // associated offset is not yet save to be committed as 'done'.
+ offsetManager.cacheEvent(modelEvent.getTransactionId(), event.getOffset());
+
+ while (eventQueue.size() >= eventQueueCapacity) {
+ // Wait until there's room in the queue
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE,
+ "Event could not be published to the event bus due to: Internal buffer capacity exceeded. Waiting 10 seconds.");
+ Thread.sleep(10000);
+ }
+
+ eventQueue.offer(modelEvent.toSpikeGraphEvent());
+
+ logger.info(SpikeMsgs.SPIKE_EVENT_PROCESSED, "of type: " + modelEvent.getObjectType() + " with key: "
+ + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson);
+
+ } catch (SpikeException | InterruptedException e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
+ e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE,
+ e.getMessage() + ". Incoming event payload:\n" + event.getMessage());
+ }
+ }
+
+ try {
+
+ // Get the next 'safe' offset to be committed from the offset manager.
+ // We need to do this here istead of letting the offset manager just take care
+ // of it for us because the event consumer is not thread safe. If we try to
+ // commit the offsets from another thread, it gets unhappy...
+ Long nextOffset = offsetManager.getNextOffsetToCommit();
+
+ // Make sure we actually have a real value...
+ if (nextOffset != null) {
+
+ // There is no point in continually committing the same offset value, so make sure
+ // that something has actually changed before we do anything...
+ if ((lastCommittedOffset == null) || (lastCommittedOffset != nextOffset)) {
+
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Committing offset: " + nextOffset + " to the event bus for Champ raw event topic.");
+ }
+
+ // OK, let's commit the latest value...
+ consumer.commitOffsets(nextOffset);
+ lastCommittedOffset = nextOffset;
+ }
+ }
+ } catch (OperationNotSupportedException e) {
+ // We must be working with a DMaap which doesn't support offset management. Swallow
+ // the exception
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage());
+ }
+ }
+
+ /**
+ * This class implements the threads which is responsible for buffering the events in memory and
+ * ordering them before publishing it to topic
+ * <p>
+ * Each publish operation is performed synchronously, so that the thread will only move on to the
+ * next available event once it has actually published the current event to the bus.
+ */
+ private class SpikeEventPublisher implements Runnable {
+
+ /**
+ * Partition key to use when publishing events to the event stream. We WANT all events to go to a
+ * single partition, so we are just using a hard-coded key for every event.
+ */
+ private static final String EVENTS_PARTITION_KEY = "SpikeEventKey";
+ private static final int DEFAULT_EVENT_QUEUE_DELAY = 10000;
+
+ Integer eventQueueDelay = DEFAULT_EVENT_QUEUE_DELAY;
+
+ public SpikeEventPublisher() {
+ try {
+ eventQueueDelay = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_DELAY));
+ } catch (Exception ex) {
+ }
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+
+ SpikeGraphEvent nextEvent;
+ SpikeGraphEvent event = null;
+ try {
+
+ // Get the next event to be published from the queue if it is old enough or we have too
+ // many items in the queue
+ if ((nextEvent = eventQueue.peek()) != null
+ && (System.currentTimeMillis() - nextEvent.getSpikeTimestamp() > eventQueueDelay
+ || eventQueue.size() > eventQueueCapacity)) {
+ event = eventQueue.take();
+ } else {
+ continue;
+ }
+
+ } catch (InterruptedException e) {
+
+ // Restore the interrupted status.
+ Thread.currentThread().interrupt();
+ }
+
+ // Try publishing the event to the event bus. This call will block
+ // until the event is published or times out.
+ try {
+ String eventJson = gson.toJson(new EventEnvelope(event));
+ int sentMessageCount = publisher.sendSync(EVENTS_PARTITION_KEY, eventJson);
+ if (sentMessageCount > 0) {
+ logger.info(SpikeMsgs.SPIKE_EVENT_PUBLISHED, "of type: " + event.getObjectType() + " with key: "
+ + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_EVENT_PUBLISHED, eventJson);
+ } else {
+ logger.warn(SpikeMsgs.SPIKE_PUBLISH_FAILED, "of type: " + event.getObjectType() + " with key: "
+ + event.getObjectKey() + " , transaction-id: " + event.getTransactionId());
+ logger.debug(SpikeMsgs.SPIKE_PUBLISH_FAILED, eventJson);
+ }
+
+
+ // Inform the offset manager that this event has been published. It's offset
+ // can now, potentially, be safely committed to the event bus so that on a
+ // restart we won't reprocess it.
+ offsetManager.markAsPublished(event.getTransactionId());
+
+ } catch (ExecutionException e) {
+
+ // Publish timed out, queue it up to retry again. Since this message was pulled from the
+ // top of the queue, it will go back to the top.
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, "Retrying in 60 seconds. " + e.getMessage());
+ eventQueue.offer(event);
+
+ try {
+ Thread.sleep(60000);
+ } catch (InterruptedException e1) {
+ e1.printStackTrace();
+ }
+ } catch (Exception e) {
+ logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage());
+ }
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/service/SpikeService.java b/src/main/java/org/onap/aai/spike/service/SpikeService.java
new file mode 100644
index 0000000..3aa6dfe
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/service/SpikeService.java
@@ -0,0 +1,75 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.service;
+
+import java.util.Timer;
+import javax.annotation.PreDestroy;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
+import org.onap.aai.spike.logging.SpikeMsgs;
+import org.onap.aai.spike.schema.EdgeRulesLoader;
+import org.onap.aai.spike.schema.OXMModelLoader;
+import org.onap.aai.spike.util.SpikeConstants;
+import org.onap.aai.spike.util.SpikeProperties;
+
+public class SpikeService {
+
+ private EventConsumer consumer;
+ private EventPublisher publisher;
+ private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeService.class.getName());
+ private Timer timer;
+
+ public SpikeService(EventConsumer consumer, EventPublisher publisher) {
+ this.consumer = consumer;
+ this.publisher = publisher;
+ }
+
+
+ public void startup() throws Exception {
+
+ // Load models
+ EdgeRulesLoader.loadModels();
+ OXMModelLoader.loadModels();
+
+ Long interval = 30000L;
+ try {
+ interval = Long.parseLong(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_POLL_INTERVAL));
+ } catch (Exception ex) {
+ }
+
+ SpikeEventProcessor processEvent = new SpikeEventProcessor(consumer, publisher);
+ timer = new Timer("spike-consumer");
+ timer.schedule(processEvent, interval, interval);
+
+ logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName());
+ }
+
+ @PreDestroy
+ protected void preShutdown() {
+ logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName());
+ timer.cancel();
+
+
+ }
+
+}
diff --git a/src/main/java/org/onap/aai/spike/util/FileWatcher.java b/src/main/java/org/onap/aai/spike/util/FileWatcher.java
new file mode 100644
index 0000000..6bbb228
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/util/FileWatcher.java
@@ -0,0 +1,45 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.util;
+
+import java.io.File;
+import java.util.TimerTask;
+
+public abstract class FileWatcher extends TimerTask {
+ private long timeStamp;
+ private File file;
+
+ public FileWatcher(File file) {
+ this.file = file;
+ this.timeStamp = file.lastModified();
+ }
+
+ public final void run() {
+ long timeStamp = file.lastModified();
+
+ if ((timeStamp - this.timeStamp) > 500) {
+ this.timeStamp = timeStamp;
+ onChange(file);
+ }
+ }
+
+ protected abstract void onChange(File file);
+}
diff --git a/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java b/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java
new file mode 100644
index 0000000..48e26c8
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java
@@ -0,0 +1,116 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URISyntaxException;
+import java.nio.file.Files;
+import java.nio.file.NoSuchFileException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Properties;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.spike.exception.SpikeException;
+import org.onap.aai.spike.logging.SpikeMsgs;
+
+public class SchemaIngestPropertiesReader {
+
+ private static final String SCHEMA_INGEST_PROPERTIES_LOCATION =
+ System.getProperty("CONFIG_HOME") + "/schemaIngest.properties";
+
+ private static org.onap.aai.cl.api.Logger logger =
+ LoggerFactory.getInstance().getLogger(SchemaIngestPropertiesReader.class.getName());
+
+ /**
+ * Gets the Node directory location to ingest OXMs
+ *
+ * @return path to OXMs
+ * @throws SpikeException
+ */
+ public String getNodeDir() throws SpikeException {
+ Properties prop = findSchemaIngestPropertyFile();
+ return prop.getProperty("nodeDir");
+ }
+
+ /**
+ * Gets the Edge directory location to ingest OXM
+ *
+ * @return path to OXMs
+ * @throws SpikeException
+ */
+ public String getEdgeDir() throws SpikeException {
+ Properties prop = findSchemaIngestPropertyFile();
+ return prop.getProperty("edgeDir");
+ }
+
+ /**
+ * Gets the location of the Edge Properties
+ *
+ * @return
+ * @throws SpikeException
+ */
+ public String getEdgePropsDir() throws SpikeException {
+
+ Properties prop = findSchemaIngestPropertyFile();
+ return prop.getProperty("edgePropsDir");
+ }
+
+
+ private Properties findSchemaIngestPropertyFile() throws SpikeException {
+ Properties prop = new Properties();
+ try {
+ prop = loadFromFile(SCHEMA_INGEST_PROPERTIES_LOCATION);
+ } catch (NoSuchFileException e) {
+ // if file not found, try via classpath
+ try {
+ prop = loadFromClasspath("schemaIngest.properties");
+ } catch (URISyntaxException | IOException e1) {
+ logger.error(SpikeMsgs.SPIKE_SCHEMA_INGEST_LOAD_ERROR, e1.getMessage());
+ throw new SpikeException("Failed to load schemaIngest.properties", e1);
+ }
+ } catch (IOException e) {
+ logger.error(SpikeMsgs.SPIKE_SCHEMA_INGEST_LOAD_ERROR, e.getMessage());
+ throw new SpikeException("Failed to load schemaIngest.properties", e);
+ }
+ return prop;
+ }
+
+ private Properties loadFromFile(String filename) throws IOException {
+ Path configLocation = Paths.get(filename);
+ try (InputStream stream = Files.newInputStream(configLocation)) {
+ return loadProperties(stream);
+ }
+ }
+
+ private Properties loadFromClasspath(String resourceName) throws URISyntaxException, IOException {
+ Path path = Paths.get(ClassLoader.getSystemResource(resourceName).toURI());
+ try (InputStream stream = Files.newInputStream(path)) {
+ return loadProperties(stream);
+ }
+ }
+
+ private Properties loadProperties(InputStream stream) throws IOException {
+ Properties config = new Properties();
+ config.load(stream);
+ return config;
+ }
+}
diff --git a/src/main/java/org/onap/aai/spike/util/SpikeConstants.java b/src/main/java/org/onap/aai/spike/util/SpikeConstants.java
new file mode 100644
index 0000000..95087e8
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/util/SpikeConstants.java
@@ -0,0 +1,43 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Spike
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP and OpenECOMP are trademarks
+ * and service marks of AT&T Intellectual Property.
+ */
+package org.onap.aai.spike.util;
+
+public class SpikeConstants {
+ // Logging related
+ public static final String SPIKE_SERVICE_NAME = "spike";
+
+ public static final String SPIKE_FILESEP =
+ (System.getProperty("file.separator") == null) ? "/" : System.getProperty("file.separator");
+
+ public static final String SPIKE_SPECIFIC_CONFIG = System.getProperty("CONFIG_HOME") + SPIKE_FILESEP;
+
+ public static final String SPIKE_HOME_MODEL = SPIKE_SPECIFIC_CONFIG + "model" + SPIKE_FILESEP;
+ public static final String SPIKE_EVENT_POLL_INTERVAL = "spike.event.poll.interval";
+ public static final String SPIKE_EVENT_QUEUE_CAPACITY = "spike.event.queue.capacity";
+ public static final String SPIKE_EVENT_QUEUE_DELAY = "spike.event.queue.delay";
+ public static final String SPIKE_EVENT_OFFSET_CHECK_PERIOD = "spike.event.offset.period";
+ public static final String SPIKE_PROPS_RESERVED = "spike.props.reserved";
+ public static final String SPIKE_CONFIG_FILE = SPIKE_SPECIFIC_CONFIG + "spike.properties";
+}
diff --git a/src/main/java/org/onap/aai/spike/util/SpikeProperties.java b/src/main/java/org/onap/aai/spike/util/SpikeProperties.java
new file mode 100644
index 0000000..ac95efb
--- /dev/null
+++ b/src/main/java/org/onap/aai/spike/util/SpikeProperties.java
@@ -0,0 +1,52 @@
+/**
+ * ============LICENSE_START=======================================================
+ * org.onap.aai
+ * ================================================================================
+ * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
+ * Copyright © 2017-2018 Amdocs
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.aai.spike.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Properties;
+
+public class SpikeProperties {
+
+ private static Properties properties;
+
+ static {
+ properties = new Properties();
+ File file = new File(SpikeConstants.SPIKE_CONFIG_FILE);
+ try {
+ properties.load(new FileInputStream(file));
+ } catch (FileNotFoundException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+ public static String get(String key) {
+ return properties.getProperty(key);
+ }
+
+ public static String get(String key, String defaultValue) {
+ return properties.getProperty(key, defaultValue);
+ }
+}