From 7d3dcdbff807ba1facb84f94ac39ff91bf410b29 Mon Sep 17 00:00:00 2001 From: Michael Arrastia Date: Thu, 12 Jul 2018 13:37:43 +0100 Subject: Migrate Spike code to ONAP Move what was originally an Open ECOMP microservice into ONAP. This is primarily a code move and includes: - removal of AJSC and replacement with latest version of Spring Boot - alignment with ONAP standards - license headers - package renaming - FOSSology scan completed Further work is required to add JJB and OOM. Change-Id: I305c8407256bf2dbcc816f34f031f92eafd6ef5a Issue-ID: AAI-1374 Signed-off-by: Michael Arrastia --- .../java/org/onap/aai/spike/SpikeApplication.java | 48 +++ .../aai/spike/event/envelope/EventEnvelope.java | 75 +++++ .../spike/event/envelope/EventEnvelopeParser.java | 64 ++++ .../onap/aai/spike/event/envelope/EventHeader.java | 194 +++++++++++ .../onap/aai/spike/event/incoming/GizmoEdge.java | 132 ++++++++ .../aai/spike/event/incoming/GizmoGraphEvent.java | 206 ++++++++++++ .../onap/aai/spike/event/incoming/GizmoVertex.java | 116 +++++++ .../aai/spike/event/incoming/OffsetManager.java | 356 +++++++++++++++++++++ .../onap/aai/spike/event/outgoing/GsonExclude.java | 32 ++ .../onap/aai/spike/event/outgoing/SpikeEdge.java | 143 +++++++++ .../spike/event/outgoing/SpikeEventComparator.java | 33 ++ .../outgoing/SpikeEventExclusionStrategy.java | 37 +++ .../aai/spike/event/outgoing/SpikeGraphEvent.java | 168 ++++++++++ .../onap/aai/spike/event/outgoing/SpikeVertex.java | 127 ++++++++ .../onap/aai/spike/exception/SpikeException.java | 46 +++ .../java/org/onap/aai/spike/logging/SpikeMsgs.java | 167 ++++++++++ .../org/onap/aai/spike/schema/EdgeRulesLoader.java | 229 +++++++++++++ .../aai/spike/schema/GraphEventTransformer.java | 282 ++++++++++++++++ .../java/org/onap/aai/spike/schema/MapAdapter.java | 65 ++++ .../org/onap/aai/spike/schema/OXMModelLoader.java | 187 +++++++++++ .../onap/aai/spike/schema/OxmConfigTranslator.java | 98 ++++++ .../org/onap/aai/spike/schema/Relationship.java | 102 ++++++ .../onap/aai/spike/schema/RelationshipSchema.java | 133 ++++++++ .../org/onap/aai/spike/service/EchoService.java | 85 +++++ .../aai/spike/service/SpikeEventProcessor.java | 298 +++++++++++++++++ .../org/onap/aai/spike/service/SpikeService.java | 75 +++++ .../java/org/onap/aai/spike/util/FileWatcher.java | 45 +++ .../spike/util/SchemaIngestPropertiesReader.java | 116 +++++++ .../org/onap/aai/spike/util/SpikeConstants.java | 43 +++ .../org/onap/aai/spike/util/SpikeProperties.java | 52 +++ 30 files changed, 3754 insertions(+) create mode 100644 src/main/java/org/onap/aai/spike/SpikeApplication.java create mode 100644 src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java create mode 100644 src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java create mode 100644 src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java create mode 100644 src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java create mode 100644 src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java create mode 100644 src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java create mode 100644 src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java create mode 100644 src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java create mode 100644 src/main/java/org/onap/aai/spike/exception/SpikeException.java create mode 100644 src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java create mode 100644 src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java create mode 100644 src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java create mode 100644 src/main/java/org/onap/aai/spike/schema/MapAdapter.java create mode 100644 src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java create mode 100644 src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java create mode 100644 src/main/java/org/onap/aai/spike/schema/Relationship.java create mode 100644 src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java create mode 100644 src/main/java/org/onap/aai/spike/service/EchoService.java create mode 100644 src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java create mode 100644 src/main/java/org/onap/aai/spike/service/SpikeService.java create mode 100644 src/main/java/org/onap/aai/spike/util/FileWatcher.java create mode 100644 src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java create mode 100644 src/main/java/org/onap/aai/spike/util/SpikeConstants.java create mode 100644 src/main/java/org/onap/aai/spike/util/SpikeProperties.java (limited to 'src/main/java') diff --git a/src/main/java/org/onap/aai/spike/SpikeApplication.java b/src/main/java/org/onap/aai/spike/SpikeApplication.java new file mode 100644 index 0000000..e031de3 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/SpikeApplication.java @@ -0,0 +1,48 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 European Software Marketing Ltd. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike; + +import java.util.HashMap; +import org.eclipse.jetty.util.security.Password; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.boot.web.servlet.support.SpringBootServletInitializer; +import org.springframework.context.annotation.ImportResource; + +/** + * Spike service Spring Boot Application + */ +@SpringBootApplication +@ImportResource({"file:${SERVICE_BEANS}/*.xml"}) +public class SpikeApplication extends SpringBootServletInitializer { + + public static void main(String[] args) { + String keyStorePassword = System.getProperty("KEY_STORE_PASSWORD"); + if (keyStorePassword == null || keyStorePassword.isEmpty()) { + throw new IllegalArgumentException("System Property KEY_STORE_PASSWORD not set"); + } + HashMap props = new HashMap<>(); + props.put("server.ssl.key-store-password", Password.deobfuscate(keyStorePassword)); + new SpikeApplication().configure(new SpringApplicationBuilder(SpikeApplication.class).properties(props)) + .run(args); + } + +} diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java new file mode 100644 index 0000000..1b92653 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelope.java @@ -0,0 +1,75 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.envelope; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.onap.aai.spike.event.outgoing.SpikeGraphEvent; + +public class EventEnvelope { + + private EventHeader header; + private SpikeGraphEvent body; + + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public EventEnvelope(EventHeader eventHeader, SpikeGraphEvent body) { + this.header = eventHeader; + this.body = body; + } + + /** + * Construct the envelope header from the provided event. + * + * @param event the Spike graph event for a vertex or edge operation + */ + public EventEnvelope(SpikeGraphEvent event) { + this.header = new EventHeader.Builder().requestId(event.getTransactionId()).build(); + this.body = event; + } + + public EventHeader getEventHeader() { + return header; + } + + public SpikeGraphEvent getBody() { + return body; + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } +} diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java new file mode 100644 index 0000000..8b62ffa --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/envelope/EventEnvelopeParser.java @@ -0,0 +1,64 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.envelope; + +import java.util.UUID; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; +import org.onap.aai.spike.event.incoming.GizmoGraphEvent; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.schema.GraphEventTransformer; + +public class EventEnvelopeParser { + + /** + * Parses the event to extract the content of the body and generate a {@link GizmoGraphEvent} + * object. + * + * @param event event envelope with both header and body properties + * @return the body of the event represented by a {@link GizmoGraphEvent} object + * @throws SpikeException if the event cannot be parsed + */ + public GizmoGraphEvent parseEvent(String event) throws SpikeException { + + GizmoGraphEvent graphEvent = GizmoGraphEvent.fromJson(extractEventBody(event)); + + GraphEventTransformer.populateUUID(graphEvent); + if (graphEvent.getTransactionId() == null || graphEvent.getTransactionId().isEmpty()) { + graphEvent.setTransactionId(UUID.randomUUID().toString()); + } + if (graphEvent.getRelationship() != null) { + GraphEventTransformer.validateEdgeModel(graphEvent.getRelationship()); + } else if (graphEvent.getVertex() != null) { + GraphEventTransformer.validateVertexModel(graphEvent.getVertex()); + } else { + throw new SpikeException("Unable to parse event: " + event); + } + + return graphEvent; + } + + private String extractEventBody(String event) { + JsonParser jsonParser = new JsonParser(); + JsonElement jsonElement = jsonParser.parse(event); + return jsonElement.getAsJsonObject().get("body").toString(); + } +} diff --git a/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java new file mode 100644 index 0000000..f8aa72b --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/envelope/EventHeader.java @@ -0,0 +1,194 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.envelope; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Objects; +import java.util.UUID; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import org.apache.commons.lang3.builder.EqualsBuilder; + +public class EventHeader { + + @SerializedName("request-id") + private String requestId; + + private String timestamp; + + @SerializedName("source-name") + private String sourceName; + + @SerializedName("event-type") + private String eventType; + + @SerializedName("validation-entity-type") + private String validationEntityType; + + @SerializedName("validation-top-entity-type") + private String validationTopEntityType; + + @SerializedName("entity-link") + private String entityLink; + + private static final String MICROSERVICE_NAME = "SPIKE"; + private static final String EVENT_TYPE_PENDING_UPDATE = "update-notification"; + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public static class Builder { + + private String requestId; + private String validationEntityType; + private String validationTopEntityType; + private String entityLink; + + public Builder requestId(String val) { + requestId = val; + return this; + } + + public Builder validationEntityType(String val) { + validationEntityType = val; + return this; + } + + public Builder validationTopEntityType(String val) { + validationTopEntityType = val; + return this; + } + + public Builder entityLink(String val) { + entityLink = val; + return this; + } + + public EventHeader build() { + return new EventHeader(this); + } + } + + private EventHeader(Builder builder) { + requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString(); + timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now()); + sourceName = MICROSERVICE_NAME; + eventType = EVENT_TYPE_PENDING_UPDATE; + + validationEntityType = builder.validationEntityType; + validationTopEntityType = builder.validationTopEntityType; + entityLink = builder.entityLink; + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /////////////////////////////////////////////////////////////////////////// + // GETTERS + /////////////////////////////////////////////////////////////////////////// + + public String getRequestId() { + return requestId; + } + + public String getTimestamp() { + return timestamp; + } + + public String getSourceName() { + return sourceName; + } + + public String getEventType() { + return eventType; + } + + public String getValidationEntityType() { + return validationEntityType; + } + + public String getValidationTopEntityType() { + return validationTopEntityType; + } + + public String getEntityLink() { + return entityLink; + } + + /////////////////////////////////////////////////////////////////////////// + // OVERRIDES + /////////////////////////////////////////////////////////////////////////// + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType, + this.validationTopEntityType, this.entityLink); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof EventHeader)) { + return false; + } else if (obj == this) { + return true; + } + EventHeader rhs = (EventHeader) obj; + // @formatter:off + return new EqualsBuilder() + .append(requestId, rhs.requestId) + .append(timestamp, rhs.timestamp) + .append(sourceName, rhs.sourceName) + .append(eventType, rhs.eventType) + .append(validationEntityType, rhs.validationEntityType) + .append(validationTopEntityType, rhs.validationTopEntityType) + .append(entityLink, rhs.entityLink) + .isEquals(); + // @formatter:on + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } + +} diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java new file mode 100644 index 0000000..af10a6e --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoEdge.java @@ -0,0 +1,132 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.incoming; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.exception.SpikeException; + +/** + * This class provides a generic representation of an Edge as provided by the graph data store. + * + */ +public class GizmoEdge { + + /** + * The unique identifier used to identify this edge in the graph data store. + */ + @SerializedName("key") + private String id; + + /** Type label assigned to this vertex. */ + private String type; + + /** Source vertex for our edge. */ + private GizmoVertex source; + + /** Target vertex for our edge. */ + private GizmoVertex target; + + /** Map of all of the properties assigned to this vertex. */ + private JsonElement properties; + + /** Marshaller/unmarshaller for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public GizmoVertex getSource() { + return source; + } + + public void setSource(GizmoVertex source) { + this.source = source; + } + + public GizmoVertex getTarget() { + return target; + } + + public void setTarget(GizmoVertex target) { + this.target = target; + } + + public JsonElement getProperties() { + return properties; + } + + public void setProperties(JsonElement properties) { + this.properties = properties; + } + + /** + * Unmarshalls this Edge object into a JSON string. + * + * @return - A JSON format string representation of this Edge. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Marshalls the provided JSON string into a Edge object. + * + * @param json - The JSON string to produce the Edge from. + * + * @return - A Edge object. + * + * @throws SpikeException + */ + public static GizmoEdge fromJson(String json) throws SpikeException { + + try { + + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into an Edge object. + return gson.fromJson(json, GizmoEdge.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } +} diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java new file mode 100644 index 0000000..419b1a5 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoGraphEvent.java @@ -0,0 +1,206 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.incoming; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.event.outgoing.SpikeEdge; +import org.onap.aai.spike.event.outgoing.SpikeGraphEvent; +import org.onap.aai.spike.event.outgoing.SpikeGraphEvent.SpikeOperation; +import org.onap.aai.spike.event.outgoing.SpikeVertex; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.schema.EdgeRulesLoader; +import org.onap.aai.spike.schema.OXMModelLoader; + +public class GizmoGraphEvent { + private String operation; + + @SerializedName("transaction-id") + private String transactionId; + + @SerializedName("database-transaction-id") + private String dbTransactionId; + + private long timestamp; + + private GizmoVertex vertex; + + private GizmoEdge relationship; + + /** Marshaller/unmarshaller for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public String getOperation() { + return operation; + } + + public void setOperation(String operation) { + this.operation = operation; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + public GizmoVertex getVertex() { + return vertex; + } + + public void setVertex(GizmoVertex vertex) { + this.vertex = vertex; + } + + public GizmoEdge getRelationship() { + return relationship; + } + + public void setRelationship(GizmoEdge relationship) { + this.relationship = relationship; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public String getDbTransactionId() { + return dbTransactionId; + } + + /** + * Unmarshalls this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + public SpikeGraphEvent toSpikeGraphEvent() throws SpikeException { + SpikeGraphEvent spikeGraphEvent = new SpikeGraphEvent(); + spikeGraphEvent.setTransactionId(this.getTransactionId()); + spikeGraphEvent.setDbTransactionId(this.getDbTransactionId()); + spikeGraphEvent.setOperationTimestamp(this.getTimestamp()); + if (this.getOperation().equalsIgnoreCase("STORE")) { + spikeGraphEvent.setOperation(SpikeOperation.CREATE); + } else if (this.getOperation().equalsIgnoreCase("REPLACE")) { + spikeGraphEvent.setOperation(SpikeOperation.UPDATE); + } else if (this.getOperation().equalsIgnoreCase("DELETE")) { + spikeGraphEvent.setOperation(SpikeOperation.DELETE); + } else { + throw new SpikeException("Invalid operation in GizmoGraphEvent: " + this.getOperation()); + } + if (this.getVertex() != null) { + SpikeVertex spikeVertex = new SpikeVertex(); + spikeVertex.setId(this.getVertex().getId()); + spikeVertex.setType(this.getVertex().getType()); + spikeVertex.setModelVersion(OXMModelLoader.getLatestVersion()); + spikeVertex.setProperties(this.getVertex().getProperties()); + spikeGraphEvent.setVertex(spikeVertex); + + } else if (this.getRelationship() != null) { + SpikeEdge spikeEdge = new SpikeEdge(); + spikeEdge.setId(this.getRelationship().getId()); + spikeEdge.setModelVersion(EdgeRulesLoader.getLatestSchemaVersion()); + spikeEdge.setType(this.getRelationship().getType()); + + SpikeVertex spikeSourceVertex = new SpikeVertex(); + spikeSourceVertex.setId(this.getRelationship().getSource().getId()); + spikeSourceVertex.setType(this.getRelationship().getSource().getType()); + spikeEdge.setSource(spikeSourceVertex); + + SpikeVertex spikeTargetVertex = new SpikeVertex(); + spikeTargetVertex.setId(this.getRelationship().getTarget().getId()); + spikeTargetVertex.setType(this.getRelationship().getTarget().getType()); + spikeEdge.setTarget(spikeTargetVertex); + + spikeEdge.setProperties(this.getRelationship().getProperties()); + spikeGraphEvent.setRelationship(spikeEdge); + } + + return spikeGraphEvent; + + } + + /** + * Marshalls the provided JSON string into a Vertex object. + * + * @param json - The JSON string to produce the Vertex from. + * + * @return - A Vertex object. + * + * @throws SpikeException + */ + public static GizmoGraphEvent fromJson(String json) throws SpikeException { + + try { + + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into a Vertex object. + return gson.fromJson(json, GizmoGraphEvent.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } + + @Override + public String toString() { + + return toJson(); + } + + public String getObjectKey() { + if (this.getVertex() != null) { + return this.getVertex().getId(); + } else if (this.getRelationship() != null) { + return this.getRelationship().getId(); + } + + return null; + + } + + public String getObjectType() { + if (this.getVertex() != null) { + return "Vertex->" + this.getVertex().getType(); + } else if (this.getRelationship() != null) { + return "Relationship->" + this.getRelationship().getType(); + } + + return null; + + } +} diff --git a/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java new file mode 100644 index 0000000..2f55c57 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/incoming/GizmoVertex.java @@ -0,0 +1,116 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.incoming; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.exception.SpikeException; + +/** + * This class provides a generic representation of a Vertex as provided by the graph data store. + * + */ +public class GizmoVertex { + + /** + * The unique identifier used to identify this vertex in the graph data store. + */ + @SerializedName("key") + private String id; + + /** Type label assigned to this vertex. */ + private String type; + + /** Map of all of the properties assigned to this vertex. */ + private JsonElement properties; + + /** Marshaller/unmarshaller for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public JsonElement getProperties() { + return properties; + } + + public void setProperties(JsonElement properties) { + this.properties = properties; + } + + /** + * Unmarshalls this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Marshalls the provided JSON string into a Vertex object. + * + * @param json - The JSON string to produce the Vertex from. + * + * @return - A Vertex object. + * + * @throws SpikeException + */ + public static GizmoVertex fromJson(String json) throws SpikeException { + + try { + + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into a Vertex object. + return gson.fromJson(json, GizmoVertex.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } + + @Override + public String toString() { + + return toJson(); + } +} diff --git a/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java new file mode 100644 index 0000000..58795b3 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/incoming/OffsetManager.java @@ -0,0 +1,356 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.incoming; + +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.logging.SpikeMsgs; + + +/** + * Instances of this class maintain a buffer of events which have been received and are queued up to + * be processed. + *

+ * A background thread advances a pointer into the buffer which always points to the head of the + * most recent consecutive block of processed events. This allows us, at any time, to know what + * offset value can be safely committed to the event store (meaning any events before that offset + * into the event topic will not be reprocessed on a restart). + */ +public class OffsetManager { + + /** Buffer that we use for caching 'in flight' events. */ + private RingEntry[] ringBuffer; + + /** Number of elements that can be stored in the buffer. */ + private int bufferSize; + + /** Pointer to the next free slot in the buffer. */ + private AtomicLong writePointer = new AtomicLong(0L); + + /** + * Pointer to the next slot in the buffer to wait to be published so that we can commit its offset. + */ + private long commitPointer = 0; + + /** + * Executor for scheduling the background task which commits offsets to the event bus. + */ + private ScheduledExecutorService offsetCommitService = Executors.newScheduledThreadPool(1); + + /** + * The next offset value which represents the head of a consecutive block of events which have been + * processed. + */ + private Long nextOffsetToCommit = null; + + private static Logger logger = LoggerFactory.getInstance().getLogger(OffsetManager.class.getName()); + + + /** + * Creates a new instance of the offset manager. + * + * @param bufferCapacity - The requested size of the buffer that we will use to cache offsets for + * events that are waiting to be processed. + * @param offsetCheckPeriodMs - The period at which we will try to update what we consider to be the + * next offset that can be safely committed to the event bus. + */ + public OffsetManager(int bufferCapacity, long offsetCheckPeriodMs) { + + // In order to make the math work nicely for our write and commit pointers, we + // need our buffer size to be a power of 2, so round the supplied buffer size + // up to ensure that it is a power of two. + // + // This way we can just keep incrementing our pointers forever without worrying + // about wrapping (we'll eventually roll over from LongMax to LongMin, but if the + // buffer size is a power of 2 then our modded physical indexes will still magically + // map to the next consecutive index. (Math!) + bufferSize = nextPowerOf2(bufferCapacity); + + // Now, allocate and initialize our ring buffer. + ringBuffer = new RingEntry[bufferSize]; + for (int i = 0; i < bufferSize; i++) { + ringBuffer[i] = new RingEntry(); + } + + // Schedule a task to commit the most recent offset value to the event library. + offsetCommitService.scheduleAtFixedRate(new OffsetCommitter(), offsetCheckPeriodMs, offsetCheckPeriodMs, + TimeUnit.MILLISECONDS); + + logger.info(SpikeMsgs.OFFSET_MANAGER_STARTED, Integer.toString(bufferSize), Long.toString(offsetCheckPeriodMs)); + } + + + /** + * Logs an event with the offset manager. + * + * @param transactionId - The transaction id associated with this event. + * @param commitOffset - The event bus offset associated with this event. + * + * @return - The index into the offset manager's buffer for this event. + */ + public int cacheEvent(String transactionId, long commitOffset) { + + // Get the index to the next free slot in the ring... + int index = nextFreeSlot(); + + if (logger.isDebugEnabled()) { + logger.debug("Caching event with transaction-id: " + transactionId + " offset: " + commitOffset + + " to offset manager at index: " + index); + } + + // ...and update it with the event meta data we want to cache. + ringBuffer[index].setTransactionId(transactionId); + ringBuffer[index].setCommitOffset(commitOffset); + + return index; + } + + + /** + * Marks a cached event as 'published'. + * + * @param anIndex - The index into the event cache that we want to update. + * @throws SpikeException + */ + public void markAsPublished(int anIndex) throws SpikeException { + + // Make sure that we were supplied a valid index. + if ((anIndex < 0) || (anIndex > bufferSize - 1)) { + throw new SpikeException("Invalid index " + anIndex + " for offset manager buffer."); + } + + // It is only valid to mark a cell as 'Published' if it is already + // in the 'Processing' state. + if (!ringBuffer[anIndex].state.compareAndSet(RingEntry.PROCESSING, RingEntry.PUBLISHED)) { + throw new SpikeException("Unexpected event state: " + state2String(ringBuffer[anIndex].state.get())); + } + + if (logger.isDebugEnabled()) { + logger.debug("Event in offset manger buffer at index: " + anIndex + " marked as 'published'"); + } + } + + + /** + * Marks a cached event as 'published'. + * + * @param transactionId - The transaction id of the event we want to update. + * + * @throws SpikeException + */ + public void markAsPublished(String transactionId) throws SpikeException { + + // Iterate over the ring buffer and try to find the specified transaction + // id. + for (int i = 0; i < bufferSize; i++) { + + // Is this the one? + if (ringBuffer[i].getTransactionId() == transactionId) { + + // Found the one we are looking for! + markAsPublished(i); + return; + } + } + + // If we made it here then we didn't find an event with the supplied transaction id. + throw new SpikeException("No event with transaction id: " + transactionId + " exists in offset manager buffer"); + } + + + /** + * Retrieves our current view of what is the most recent offset value that can be safely committed + * to the event bus (meaning that all events on the topic before that offset value have been + * processed and shouldn't be re-consumed after a restart). + * + * @return - The next 'safe' offset. + */ + public long getNextOffsetToCommit() { + return nextOffsetToCommit; + } + + + /** + * Finds the next slot in the ring which is marked as 'free'. + * + * @return - An index into the ring buffer. + */ + private int nextFreeSlot() { + + int currentIndex = (int) (writePointer.getAndIncrement() % bufferSize); + while (!ringBuffer[currentIndex].state.compareAndSet(RingEntry.FREE, RingEntry.PROCESSING)) { + currentIndex = (int) (writePointer.getAndIncrement() % bufferSize); + } + + return currentIndex; + } + + + /** + * Given a number, this helper method finds the next largest number that is a power of 2. + * + * @param aNumber - The number to compute the next power of two for. + * + * @return - The next largest power of 2 for the supplied number. + */ + private int nextPowerOf2(int aNumber) { + + int powerOfTwo = 1; + while (powerOfTwo < aNumber) { + powerOfTwo *= 2; + } + return powerOfTwo; + } + + private String state2String(int aState) { + + switch (aState) { + case RingEntry.FREE: + return "FREE"; + + case RingEntry.PROCESSING: + return "PROCESSING"; + + case RingEntry.PUBLISHED: + return "PUBLISHED"; + + default: + return "UNDEFINED(" + aState + ")"; + } + } + + + /** + * Defines the structure of the entries in the ring buffer which represent events which are 'in + * flight'. + */ + public class RingEntry { + + private final static int FREE = 1; // Slot in buffer is available to be written to. + private final static int PROCESSING = 2; // Slot in buffer represents an event which is waiting to be processed. + private final static int PUBLISHED = 3; // Slot in buffer represents an event which has been published. + + /** + * Describes the state of this entry in the ring: + *

+ * FREE = This slot is currently unused and may be written to. + *

+ * PROCESSING = This slot describes an event which has not yet been published. + *

+ * PUBLISHED = This lot describes an event which has been published and therefore may be released. + */ + public AtomicInteger state = new AtomicInteger(FREE); + + /** The unique identifier of the event which this entry represents. */ + private String transactionId; + + /** The event bus offset associated with the event which this entry represents. */ + private long commitOffset; + + + /** + * Retrieve the transaction id for the event represented by this entry. + * + * @return - Transaction id. + */ + public String getTransactionId() { + return transactionId; + } + + + /** + * Assigns a transaction id to this entry. + * + * @param transactionId - The unique id for this entry. + */ + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + + /** + * Retrieves the offset of the event represented by this entry. + * + * @return - An event bus offset value. + */ + public long getCommitOffset() { + return commitOffset; + } + + + /** + * Assigns an offset value to this entry. + * + * @param commitOffset - Offset value for this entry. + */ + public void setCommitOffset(long commitOffset) { + this.commitOffset = commitOffset; + } + } + + + /** + * This class implements a simple background task which wakes up periodically and determines the + * next available offset from the ring buffer which is safe to commit to the event bus. + */ + private class OffsetCommitter implements Runnable { + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + + // Get the index into the ring buffer of the next slot to be checked. + int currentCommitIndex = (int) (commitPointer % bufferSize); + + // If this entry is in the 'published' state then its offset is good to be + // committed. + while (ringBuffer[currentCommitIndex].state.get() == RingEntry.PUBLISHED) { + + // Grab the offset of the current entry. + nextOffsetToCommit = ringBuffer[currentCommitIndex].getCommitOffset(); + + // We don't need to keep the current entry alive any longer, so free it and advance + // to the next entry in the ring. + ringBuffer[currentCommitIndex].state.set(RingEntry.FREE); + commitPointer++; + + // Update our index and loop back to check the next one. We will keep advancing + // as long as we have consecutive entries that are flagged as 'published'. + currentCommitIndex = (int) (commitPointer % bufferSize); + } + + if (logger.isDebugEnabled()) { + logger.debug("Offset to commit to event bus: " + + ((nextOffsetToCommit != null) ? nextOffsetToCommit : "none")); + } + } + } +} diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java new file mode 100644 index 0000000..2ba949a --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/GsonExclude.java @@ -0,0 +1,32 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD}) +public @interface GsonExclude { + // Field tag only annotation +} diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java new file mode 100644 index 0000000..7fb030d --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEdge.java @@ -0,0 +1,143 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.exception.SpikeException; + +/** + * This class provides a generic representation of an Edge as provided by the graph data store. + * + */ +public class SpikeEdge { + + /** + * The unique identifier used to identify this edge in the graph data store. + */ + @SerializedName("key") + private String id; + + @SerializedName("schema-version") + private String modelVersion; + + /** Type label assigned to this vertex. */ + private String type; + + /** Source vertex for our edge. */ + private SpikeVertex source; + + /** Target vertex for our edge. */ + private SpikeVertex target; + + /** Map of all of the properties assigned to this vertex. */ + private JsonElement properties; + + /** Marshaller/unmarshaller for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public SpikeVertex getSource() { + return source; + } + + public void setSource(SpikeVertex source) { + this.source = source; + } + + public SpikeVertex getTarget() { + return target; + } + + public void setTarget(SpikeVertex target) { + this.target = target; + } + + public JsonElement getProperties() { + return properties; + } + + public void setProperties(JsonElement properties) { + this.properties = properties; + } + + public String getModelVersion() { + return modelVersion; + } + + public void setModelVersion(String modelVersion) { + this.modelVersion = modelVersion; + } + + /** + * Unmarshalls this Edge object into a JSON string. + * + * @return - A JSON format string representation of this Edge. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Marshalls the provided JSON string into a Edge object. + * + * @param json - The JSON string to produce the Edge from. + * + * @return - A Edge object. + * + * @throws SpikeException + */ + public static SpikeEdge fromJson(String json) throws SpikeException { + + try { + + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into an Edge object. + return gson.fromJson(json, SpikeEdge.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } +} diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java new file mode 100644 index 0000000..87ce2ce --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventComparator.java @@ -0,0 +1,33 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import java.util.Comparator; + +public class SpikeEventComparator implements Comparator { + + @Override + public int compare(SpikeGraphEvent e1, SpikeGraphEvent e2) { + + // Note: this will break down if the difference is more than 24 days, or int.max milliseconds + return (int) (e1.getOperationTimestamp() - e2.getOperationTimestamp()); + } +} diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java new file mode 100644 index 0000000..54ac391 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeEventExclusionStrategy.java @@ -0,0 +1,37 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import com.google.gson.ExclusionStrategy; +import com.google.gson.FieldAttributes; + +public class SpikeEventExclusionStrategy implements ExclusionStrategy { + + @Override + public boolean shouldSkipField(FieldAttributes fieldAttributes) { + return fieldAttributes.getAnnotation(GsonExclude.class) != null; + } + + @Override + public boolean shouldSkipClass(Class aClass) { + return false; + } +} diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java new file mode 100644 index 0000000..a076a55 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeGraphEvent.java @@ -0,0 +1,168 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.exception.SpikeException; + +public class SpikeGraphEvent { + + public enum SpikeOperation { + CREATE, UPDATE, DELETE + } + + private SpikeOperation operation; + + @SerializedName("transaction-id") + private String transactionId; + + @SerializedName("database-transaction-id") + private String dbTransactionId; + + @SerializedName("timestamp") + private long operationTimestamp; + + private SpikeVertex vertex; + + private SpikeEdge relationship; + + // Time this event was received in spike, used to determine when to send the event + @GsonExclude + private long spikeTimestamp = System.currentTimeMillis(); + + /** Serializer/deserializer for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public SpikeOperation getOperation() { + return operation; + } + + public void setOperation(SpikeOperation operation) { + this.operation = operation; + } + + public long getOperationTimestamp() { + return operationTimestamp; + } + + public void setOperationTimestamp(long operationTimestamp) { + this.operationTimestamp = operationTimestamp; + } + + + + public SpikeVertex getVertex() { + return vertex; + } + + public void setVertex(SpikeVertex vertex) { + this.vertex = vertex; + } + + public SpikeEdge getRelationship() { + return relationship; + } + + public void setRelationship(SpikeEdge relationship) { + this.relationship = relationship; + } + + public String getTransactionId() { + return transactionId; + } + + public void setTransactionId(String transactionId) { + this.transactionId = transactionId; + } + + public void setDbTransactionId(String dbTransactionId) { + this.dbTransactionId = dbTransactionId; + } + + public long getSpikeTimestamp() { + return spikeTimestamp; + } + + /** + * Unmarshalls this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + + /** + * Marshalls the provided JSON string into a Vertex object. + * + * @param json - The JSON string to produce the Vertex from. + * + * @return - A Vertex object. + * + * @throws SpikeException + */ + public static SpikeGraphEvent fromJson(String json) throws SpikeException { + + try { + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into a Vertex object. + return gson.fromJson(json, SpikeGraphEvent.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } + + @Override + public String toString() { + return toJson(); + } + + public String getObjectKey() { + if (this.getVertex() != null) { + return this.getVertex().getId(); + } else if (this.getRelationship() != null) { + return this.getRelationship().getId(); + } + + return null; + } + + public String getObjectType() { + if (this.getVertex() != null) { + return "Vertex->" + this.getVertex().getType(); + } else if (this.getRelationship() != null) { + return "Relationship->" + this.getRelationship().getType(); + } + + return null; + } +} + diff --git a/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java new file mode 100644 index 0000000..cd51855 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/event/outgoing/SpikeVertex.java @@ -0,0 +1,127 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.event.outgoing; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.annotations.SerializedName; +import org.onap.aai.spike.exception.SpikeException; + +/** + * This class provides a generic representation of a Vertex as provided by the graph data store. + * + */ +public class SpikeVertex { + + /** + * The unique identifier used to identify this vertex in the graph data store. + */ + @SerializedName("key") + private String id; + + @SerializedName("schema-version") + private String modelVersion; + + /** Type label assigned to this vertex. */ + private String type; + + /** Map of all of the properties assigned to this vertex. */ + private JsonElement properties; + + /** Marshaller/unmarshaller for converting to/from JSON. */ + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().create(); + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public JsonElement getProperties() { + return properties; + } + + public void setProperties(JsonElement properties) { + this.properties = properties; + } + + public String getModelVersion() { + return modelVersion; + } + + public void setModelVersion(String modelVersion) { + this.modelVersion = modelVersion; + } + + /** + * Unmarshalls this Vertex object into a JSON string. + * + * @return - A JSON format string representation of this Vertex. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Marshalls the provided JSON string into a Vertex object. + * + * @param json - The JSON string to produce the Vertex from. + * + * @return - A Vertex object. + * + * @throws SpikeException + */ + public static SpikeVertex fromJson(String json) throws SpikeException { + + try { + + // Make sure that we were actually provided a non-empty string + // before we + // go any further. + if (json == null || json.isEmpty()) { + throw new SpikeException("Empty or null JSON string."); + } + + // Marshall the string into a Vertex object. + return gson.fromJson(json, SpikeVertex.class); + + } catch (Exception ex) { + throw new SpikeException("Unable to parse JSON string: " + ex.getMessage()); + } + } + + @Override + public String toString() { + + return toJson(); + } +} diff --git a/src/main/java/org/onap/aai/spike/exception/SpikeException.java b/src/main/java/org/onap/aai/spike/exception/SpikeException.java new file mode 100644 index 0000000..90404fa --- /dev/null +++ b/src/main/java/org/onap/aai/spike/exception/SpikeException.java @@ -0,0 +1,46 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.exception; + + +public class SpikeException extends Exception { + + private static final long serialVersionUID = 8162385108397238865L; + + + public SpikeException() {} + + public SpikeException(String message) { + super(message); + } + + public SpikeException(Throwable cause) { + super(cause); + } + + public SpikeException(String message, Throwable cause) { + super(message, cause); + } + + public SpikeException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } +} diff --git a/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java b/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java new file mode 100644 index 0000000..77d7734 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/logging/SpikeMsgs.java @@ -0,0 +1,167 @@ +/** + * ============LICENSE_START======================================================= + * Spike + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.onap.aai.spike.logging; + +import com.att.eelf.i18n.EELFResourceManager; +import org.onap.aai.cl.eelf.LogMessageEnum; + +public enum SpikeMsgs implements LogMessageEnum { + + /** + * Unable to parse schema file: {0} due to error : {1} + * + * Arguments: {0} = schema file name {1} = error + */ + INVALID_OXM_FILE, + + /** + * Invalid OXM dir: {0} + * + * Arguments: {0} = Directory. + */ + INVALID_OXM_DIR, + + /** + * Unable to commit offset to event bus due to error: {0} + * + * Arguments: {0} = Failure cause. + */ + OFFSET_COMMIT_FAILURE, + + /** + * Unable to load OXM schema: {0} + * + * Arguments: {0} = error + */ + OXM_LOAD_ERROR, + + /** + * OXM file change detected: {0} + * + * Arguments: {0} = file name + */ + OXM_FILE_CHANGED, + + /** + * Successfully loaded schema: {0} + * + * Arguments: {0} = oxm filename + */ + LOADED_OXM_FILE, + + /** + * Successfully loaded Edge Properties Files: {0} + * + *

+ * Arguments: {0} = oxm filename + */ + LOADED_DB_RULE_FILE, + + /** + * Successfully Started Spike Service Arguments: {0} = Event interface implementation class name + */ + SPIKE_SERVICE_STARTED_SUCCESSFULLY, + + /** + * Event bus offset manager started: buffer size={0} commit period={1} + * + * Arguments: {0} = Event buffer capacity {1} = Period in ms at which event bus offsets will be + * committed. + */ + OFFSET_MANAGER_STARTED, + + /** + * Unable to initialize : {0} + * + * Arguments: {0} = Service name + */ + SPIKE_SERVICE_STARTED_FAILURE, + + /** + * Unable to consume event due to : {0} + * + * Arguments: {0} = error message + */ + SPIKE_EVENT_CONSUME_FAILURE, + /** + * Unable to publish event due to : {0} + * + * Arguments: {0} = error message + */ + SPIKE_EVENT_PUBLISH_FAILURE, + /** + * Event Received : {0} + * + * Arguments: {0} = event + */ + SPIKE_EVENT_RECEIVED, + + /** + * Event Processed : {0} + * + * Arguments: {0} = event + */ + SPIKE_EVENT_PROCESSED, + /** + * Event Published : {0} + * + * Arguments: {0} = event + */ + SPIKE_EVENT_PUBLISHED, + /** + * Event failed to publish: {0} + * + * Arguments: {0} = event + */ + SPIKE_PUBLISH_FAILED, + /** + * No Event Received + * + * Arguments: none + */ + SPIKE_NO_EVENT_RECEIVED, + /** + * Checking for events Arguments: none + */ + SPIKE_QUERY_EVENT_SYSTEM, + + /** + * Schema Ingest properties file was not loaded properly + */ + SPIKE_SCHEMA_INGEST_LOAD_ERROR, + + /** + * Received request {0} {1} from {2}. Sending response: {3} Arguments: {0} = operation {1} = target + * URL {2} = source {3} = response code + */ + PROCESS_REST_REQUEST; + + /** + * Static initializer to ensure the resource bundles for this class are loaded... + */ + static { + EELFResourceManager.loadMessageBundle("logging/SpikeMsgs"); + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java b/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java new file mode 100644 index 0000000..b914421 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/EdgeRulesLoader.java @@ -0,0 +1,229 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import com.google.common.collect.Multimap; +import org.apache.commons.io.IOUtils; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.edges.EdgeIngestor; +import org.onap.aai.edges.EdgeRule; +import org.onap.aai.edges.exceptions.EdgeRuleNotFoundException; +import org.onap.aai.setup.ConfigTranslator; +import org.onap.aai.setup.SchemaLocationsBean; +import org.onap.aai.setup.Version; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.logging.SpikeMsgs; +import org.onap.aai.spike.util.SchemaIngestPropertiesReader; + + +public class EdgeRulesLoader { + + private static Map versionContextMap = new ConcurrentHashMap<>(); + + static final Pattern versionPattern = Pattern.compile("V(\\d*)"); + static final String propsPrefix = "edge_properties_"; + static final String propsSuffix = ".json"; + final static Pattern propsFilePattern = Pattern.compile(propsPrefix + "(.*)" + propsSuffix); + final static Pattern propsVersionPattern = Pattern.compile("v\\d*"); + + private static org.onap.aai.cl.api.Logger logger = + LoggerFactory.getInstance().getLogger(EdgeRulesLoader.class.getName()); + + private EdgeRulesLoader() {} + + /** + * Finds all DB Edge Rules and Edge Properties files for all OXM models. + * + * @throws SpikeException + */ + public static synchronized void loadModels() throws SpikeException { + SchemaIngestPropertiesReader SchemaIngestPropertiesReader = new SchemaIngestPropertiesReader(); + SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean(); + schemaLocationsBean.setEdgeDirectory(SchemaIngestPropertiesReader.getEdgeDir()); + ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean); + EdgeIngestor edgeIngestor = new EdgeIngestor(configTranslator); + Map propFiles = edgePropertyFiles(SchemaIngestPropertiesReader); + + if (logger.isDebugEnabled()) { + logger.debug("Loading DB Edge Rules"); + } + + for (String version : OXMModelLoader.getLoadedOXMVersions()) { + try { + loadModel(Version.valueOf(version), edgeIngestor, propFiles); + } catch (IOException | EdgeRuleNotFoundException e) { + throw new SpikeException(e.getMessage(), e); + } + } + } + + /** + * Loads DB Edge Rules and Edge Properties for a given version. + * + * @throws SpikeException + */ + + public static synchronized void loadModels(String v) throws SpikeException { + SchemaIngestPropertiesReader SchemaIngestPropertiesReader = new SchemaIngestPropertiesReader(); + SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean(); + schemaLocationsBean.setEdgeDirectory(SchemaIngestPropertiesReader.getEdgeDir()); + ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean); + EdgeIngestor edgeIngestor = new EdgeIngestor(configTranslator); + String version = v.toUpperCase(); + Map propFiles = edgePropertyFiles(SchemaIngestPropertiesReader); + + if (logger.isDebugEnabled()) { + logger.debug("Loading DB Edge Rules "); + } + + try { + loadModel(Version.valueOf(version), edgeIngestor, propFiles); + } catch (IOException | EdgeRuleNotFoundException e) { + throw new SpikeException(e.getMessage()); + } + } + + /** + * Retrieves the DB Edge Rule relationship schema for a given version. + * + * @param version - The OXM version that we want the DB Edge Rule for. + * @return - A RelationshipSchema of the DB Edge Rule for the OXM version. + * @throws SpikeException + */ + public static RelationshipSchema getSchemaForVersion(String version) throws SpikeException { + + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } else if (!versionContextMap.containsKey(version)) { + logger.error(SpikeMsgs.OXM_LOAD_ERROR, "Error loading DB Edge Rules for: " + version); + throw new SpikeException("Error loading DB Edge Rules for: " + version); + } + + return versionContextMap.get(version); + } + + /** + * Retrieves the DB Edge Rule relationship schema for all loaded OXM versions. + * + * @return - A Map of the OXM version and it's corresponding RelationshipSchema of the DB Edge Rule. + * @throws SpikeException + */ + public static Map getSchemas() throws SpikeException { + + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } + return versionContextMap; + } + + /** + * Returns the latest available DB Edge Rule version. + * + * @return - A Map of the OXM version and it's corresponding RelationshipSchema of the DB Edge Rule. + * @throws SpikeException + */ + public static String getLatestSchemaVersion() throws SpikeException { + + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } + + // If there are still no models available, then there's not much we can do... + if (versionContextMap.isEmpty()) { + logger.error(SpikeMsgs.OXM_LOAD_ERROR, "No available DB Edge Rules to get latest version for."); + throw new SpikeException("No available DB Edge Rules to get latest version for."); + } + + // Iterate over the available model versions to determine which is the most + // recent. + Integer latestVersion = null; + String latestVersionStr = null; + for (String versionKey : versionContextMap.keySet()) { + + Matcher matcher = versionPattern.matcher(versionKey.toUpperCase()); + if (matcher.find()) { + + int currentVersion = Integer.parseInt(matcher.group(1)); + + if ((latestVersion == null) || (currentVersion > latestVersion)) { + latestVersion = currentVersion; + latestVersionStr = versionKey; + } + } + } + + return latestVersionStr; + } + + /** + * Reset the loaded DB Edge Rule schemas + * + */ + + public static void resetSchemaVersionContext() { + versionContextMap = new ConcurrentHashMap<>(); + } + + private static synchronized void loadModel(Version version, EdgeIngestor edgeIngestor, Map props) + throws IOException, SpikeException, EdgeRuleNotFoundException { + + Multimap edges = edgeIngestor.getAllRules(version); + String edgeProps; + if (props.get(version.toString().toLowerCase()) != null) { + edgeProps = IOUtils.toString(new FileInputStream(props.get(version.toString().toLowerCase())), "UTF-8"); + } else { + throw new FileNotFoundException("The Edge Properties file for OXM version " + version + "was not found."); + } + if (edges != null) { + RelationshipSchema rs = new RelationshipSchema(edges, edgeProps); + versionContextMap.put(version.toString().toLowerCase(), rs); + logger.info(SpikeMsgs.LOADED_DB_RULE_FILE, version.toString()); + } + } + + private static Map edgePropertyFiles(SchemaIngestPropertiesReader dir) throws SpikeException { + Map propsFiles = Arrays + .stream(new File(dir.getEdgePropsDir()) + .listFiles((d, name) -> propsFilePattern.matcher(name).matches())) + .collect(Collectors.toMap(new Function() { + public String apply(File f) { + Matcher m1 = propsVersionPattern.matcher(f.getName()); + m1.find(); + return m1.group(0); + } + }, f -> f)); + return propsFiles; + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java b/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java new file mode 100644 index 0000000..ced84bb --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/GraphEventTransformer.java @@ -0,0 +1,282 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import com.google.common.base.CaseFormat; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import org.eclipse.persistence.dynamic.DynamicType; +import org.eclipse.persistence.internal.helper.DatabaseField; +import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.eclipse.persistence.mappings.DatabaseMapping; +import org.eclipse.persistence.oxm.XMLField; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.spike.event.incoming.GizmoEdge; +import org.onap.aai.spike.event.incoming.GizmoGraphEvent; +import org.onap.aai.spike.event.incoming.GizmoVertex; +import org.onap.aai.spike.exception.SpikeException; + +/** + * This class is responsible for transforming raw graph entities (such as vertices and edges) into + * representations which correspond to the OXM models. + */ +public class GraphEventTransformer { + + private static org.onap.aai.cl.api.Logger logger = + LoggerFactory.getInstance().getLogger(GraphEventTransformer.class.getName()); + private static final String AAI_UUID = "aai-uuid"; + + /** + * + * @param rawVertex + * @throws SpikeException + */ + public static void validateVertexModel(GizmoVertex rawVertex) throws SpikeException { + + validateVertexModel(OXMModelLoader.getLatestVersion(), rawVertex); + } + + public static void populateUUID(GizmoGraphEvent event) throws SpikeException { + try { + if (event.getVertex() != null) { + if (event.getVertex().getProperties().getAsJsonObject().has(AAI_UUID)) { + event.getVertex() + .setId(event.getVertex().getProperties().getAsJsonObject().get(AAI_UUID).getAsString()); + } + } else if (event.getRelationship() != null) { + if (event.getRelationship().getProperties().getAsJsonObject().has(AAI_UUID)) { + event.getRelationship().setId( + event.getRelationship().getProperties().getAsJsonObject().get(AAI_UUID).getAsString()); + } + + if (event.getRelationship().getSource().getProperties().getAsJsonObject().has(AAI_UUID)) { + event.getRelationship().getSource().setId(event.getRelationship().getSource().getProperties() + .getAsJsonObject().get(AAI_UUID).getAsString()); + } + if (event.getRelationship().getTarget().getProperties().getAsJsonObject().has(AAI_UUID)) { + event.getRelationship().getTarget().setId(event.getRelationship().getTarget().getProperties() + .getAsJsonObject().get(AAI_UUID).getAsString()); + } + } + } catch (Exception ex) { + throw new SpikeException("Unable to parse uuid in incoming event"); + } + } + + /** + * + * @param version + * @param rawVertex + * @throws SpikeException + */ + public static void validateVertexModel(String version, GizmoVertex rawVertex) throws SpikeException { + + try { + + DynamicJAXBContext jaxbContext = OXMModelLoader.getContextForVersion(version); + String modelObjectClass = CaseFormat.LOWER_CAMEL.to(CaseFormat.UPPER_CAMEL, + CaseFormat.LOWER_HYPHEN.to(CaseFormat.UPPER_CAMEL, rawVertex.getType())); + final DynamicType modelObjectType = jaxbContext.getDynamicType(modelObjectClass); + final DynamicType reservedType = jaxbContext.getDynamicType("ReservedPropNames"); + + Set> vertexEntriesSet = + rawVertex.getProperties().getAsJsonObject().entrySet(); + Map vertexEntriesMap = new HashMap(); + for (Map.Entry entry : vertexEntriesSet) { + vertexEntriesMap.put(entry.getKey(), entry.getValue()); + } + + JsonObject modelJsonElement = new JsonObject(); + // Iterate over all of the attributes specified in the model schema, + // populating + // our dynamic instance with the corresponding values supplied in + // our raw vertex. + for (DatabaseMapping mapping : modelObjectType.getDescriptor().getMappings()) { + if (mapping.isAbstractDirectMapping()) { + DatabaseField f = mapping.getField(); + String keyName = f.getName().substring(0, f.getName().indexOf("/")); + + String defaultValue = mapping.getProperties().get("defaultValue") == null ? "" + : mapping.getProperties().get("defaultValue").toString(); + + if (((XMLField) f).isRequired() && !vertexEntriesMap.containsKey(keyName) + && !defaultValue.isEmpty()) { + modelJsonElement.addProperty(keyName, defaultValue); + + } + // If this is a required field, but is not present in the + // raw vertex, reject this + // as an invalid input since we can't build a valid object + // from what we were provided. + if (((XMLField) f).isRequired() && !vertexEntriesMap.containsKey(keyName) + && defaultValue.isEmpty()) { + throw new SpikeException("Missing required field: " + keyName); + } + + // If this is a non-required field, then set it if a value + // was provided in the + // raw vertex. + if (vertexEntriesMap.containsKey(keyName)) { + validateFieldType(vertexEntriesMap.get(keyName), f.getType()); + modelJsonElement.add(keyName, vertexEntriesMap.get(keyName)); + } + } + } + + // Ensure any of the reserved properties are added to the payload + for (DatabaseMapping mapping : reservedType.getDescriptor().getMappings()) { + if (mapping.isAbstractDirectMapping()) { + DatabaseField field = mapping.getField(); + String keyName = field.getName().substring(0, field.getName().indexOf("/")); + + if (vertexEntriesMap.containsKey(keyName)) { + validateFieldType(vertexEntriesMap.get(keyName), field.getType()); + modelJsonElement.add(keyName, vertexEntriesMap.get(keyName)); + } + } + } + + rawVertex.setProperties(modelJsonElement); + } catch (Exception e) { + throw new SpikeException(e.getMessage()); + } + } + + /** + * + * @param rawEdge + * @throws SpikeException + */ + public static void validateEdgeModel(GizmoEdge rawEdge) throws SpikeException { + + validateEdgeModel(EdgeRulesLoader.getLatestSchemaVersion(), rawEdge); + } + + /** + * + * @param version + * @param rawEdge + * @throws SpikeException + */ + public static void validateEdgeModel(String version, GizmoEdge rawEdge) throws SpikeException { + + if (logger.isDebugEnabled()) { + logger.debug("Convert edge: " + rawEdge.toString() + " to model version: " + version); + } + + // Get the relationship schema for the supplied version. + RelationshipSchema schema = EdgeRulesLoader.getSchemaForVersion(version); + + try { + + // Validate that our edge does have the necessary endpoints. + if (rawEdge.getSource() == null || rawEdge.getTarget() == null) { + throw new SpikeException("Source or target endpoint not specified"); + } + + // Create a key based on source:target:relationshipType + String sourceNodeType = rawEdge.getSource().getType(); + String targetNodeType = rawEdge.getTarget().getType(); + String key = sourceNodeType + ":" + targetNodeType + ":" + rawEdge.getType(); + + // Now, look up the specific schema model based on the key we just + // constructed. + Map> relationshipModel = schema.lookupRelation(key); + if (relationshipModel == null || relationshipModel.isEmpty()) { + throw new SpikeException("Invalid source/target/relationship type: " + key); + } + + Set> edgeEntriesSet = rawEdge.getProperties().getAsJsonObject().entrySet(); + Map edgeEntriesMap = new HashMap(); + for (Map.Entry entry : edgeEntriesSet) { + edgeEntriesMap.put(entry.getKey(), entry.getValue()); + } + + JsonObject modelJsonElement = new JsonObject(); + + for (String property : relationshipModel.keySet()) { + + if (!edgeEntriesMap.containsKey(property)) { + throw new SpikeException("Missing required field: " + property); + } + + validateFieldType(edgeEntriesMap.get(property), relationshipModel.get(property)); + modelJsonElement.add(property, edgeEntriesMap.get(property)); + + } + + rawEdge.setProperties(modelJsonElement); + + + } catch (Exception ex) { + throw new SpikeException(ex.getMessage()); + } + } + + @SuppressWarnings("unchecked") + public static Object validateFieldType(JsonElement value, Class clazz) throws SpikeException { + try { + if (clazz.isAssignableFrom(Integer.class)) { + return value.getAsInt(); + } else if (clazz.isAssignableFrom(Long.class)) { + return value.getAsLong(); + } else if (clazz.isAssignableFrom(Float.class)) { + return value.getAsFloat(); + } else if (clazz.isAssignableFrom(Double.class)) { + return value.getAsDouble(); + } else if (clazz.isAssignableFrom(Boolean.class)) { + return value.getAsBoolean(); + } else { + return value; + } + + } catch (Exception e) { + throw new SpikeException("Invalid property value: " + value); + } + } + + public static Object validateFieldType(String value, Class clazz) throws SpikeException { + try { + if (clazz.isAssignableFrom(Integer.class)) { + return Integer.parseInt(value); + } else if (clazz.isAssignableFrom(Long.class)) { + return Long.parseLong(value); + } else if (clazz.isAssignableFrom(Float.class)) { + return Float.parseFloat(value); + } else if (clazz.isAssignableFrom(Double.class)) { + return Double.parseDouble(value); + } else if (clazz.isAssignableFrom(Boolean.class)) { + if (!value.equals("true") && !value.equals("false")) { + throw new SpikeException("Invalid property value: " + value); + } + return Boolean.parseBoolean(value); + } else { + return value; + } + } catch (Exception e) { + throw new SpikeException("Invalid property value: " + value); + } + } + +} diff --git a/src/main/java/org/onap/aai/spike/schema/MapAdapter.java b/src/main/java/org/onap/aai/spike/schema/MapAdapter.java new file mode 100644 index 0000000..2e59450 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/MapAdapter.java @@ -0,0 +1,65 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.naming.OperationNotSupportedException; +import javax.xml.bind.JAXBElement; +import javax.xml.bind.annotation.XmlAnyElement; +import javax.xml.bind.annotation.adapters.XmlAdapter; +import javax.xml.namespace.QName; + + +public class MapAdapter extends XmlAdapter> { + + public static class AdaptedMap { + @XmlAnyElement + List elements; + } + + @Override + public Map unmarshal(AdaptedMap map) throws Exception { + throw new OperationNotSupportedException(); // really?? + } + + @Override + public AdaptedMap marshal(Map map) throws Exception { + + AdaptedMap adaptedMap = new AdaptedMap(); + List elements = new ArrayList(); + for (Map.Entry property : map.entrySet()) { + + if (property.getValue() instanceof Map) { + elements.add(new JAXBElement(new QName(property.getKey()), MapAdapter.AdaptedMap.class, + marshal((Map) property.getValue()))); + + } else { + + elements.add(new JAXBElement(new QName(property.getKey()), String.class, + property.getValue().toString())); + } + } + adaptedMap.elements = elements; + return adaptedMap; + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java b/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java new file mode 100644 index 0000000..0d174db --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/OXMModelLoader.java @@ -0,0 +1,187 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.eclipse.persistence.jaxb.dynamic.DynamicJAXBContext; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.nodes.NodeIngestor; +import org.onap.aai.setup.ConfigTranslator; +import org.onap.aai.setup.SchemaLocationsBean; +import org.onap.aai.setup.Version; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.logging.SpikeMsgs; +import org.onap.aai.spike.util.SchemaIngestPropertiesReader; + +/** + * This class contains all of the logic for importing OXM model schemas from the available OXM + * schema files. + */ +public class OXMModelLoader { + + private static Map versionContextMap = + new ConcurrentHashMap(); + + final static Pattern p = Pattern.compile("aai_oxm_(.*).xml"); + final static Pattern versionPattern = Pattern.compile("V(\\d*)"); + + private static org.onap.aai.cl.api.Logger logger = + LoggerFactory.getInstance().getLogger(OXMModelLoader.class.getName()); + + /** + * Finds all OXM model files + * + * @throws SpikeException + * @throws IOException + * + */ + public synchronized static void loadModels() throws SpikeException { + SchemaIngestPropertiesReader schemaIngestPropReader = new SchemaIngestPropertiesReader(); + SchemaLocationsBean schemaLocationsBean = new SchemaLocationsBean(); + schemaLocationsBean.setNodeDirectory(schemaIngestPropReader.getNodeDir()); + schemaLocationsBean.setEdgeDirectory(schemaIngestPropReader.getEdgeDir()); + ConfigTranslator configTranslator = new OxmConfigTranslator(schemaLocationsBean); + NodeIngestor nodeIngestor = new NodeIngestor(configTranslator); + + if (logger.isDebugEnabled()) { + logger.debug("Loading OXM Models"); + } + + for (Version oxmVersion : Version.values()) { + DynamicJAXBContext jaxbContext = nodeIngestor.getContextForVersion(oxmVersion); + if (jaxbContext != null) { + loadModel(oxmVersion.toString(), jaxbContext); + } + } + } + + + private synchronized static void loadModel(String oxmVersion, DynamicJAXBContext jaxbContext) { + versionContextMap.put(oxmVersion, jaxbContext); + logger.info(SpikeMsgs.LOADED_OXM_FILE, oxmVersion); + } + + /** + * Retrieves the JAXB context for the specified OXM model version. + * + * @param version - The OXM version that we want the JAXB context for. + * + * @return - A JAXB context derived from the OXM model schema. + * + * @throws SpikeException + */ + public static DynamicJAXBContext getContextForVersion(String version) throws SpikeException { + + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } else if (!versionContextMap.containsKey(version)) { + throw new SpikeException("Error loading oxm model: " + version); + } + + return versionContextMap.get(version); + } + + public static String getLatestVersion() throws SpikeException { + + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } + + // If there are still no models available, then there's not much we can do... + if (versionContextMap.isEmpty()) { + throw new SpikeException("No available OXM schemas to get latest version for."); + } + + // Iterate over the available model versions to determine which is the most + // recent. + Integer latestVersion = null; + String latestVersionStr = null; + for (String versionKey : versionContextMap.keySet()) { + + Matcher matcher = versionPattern.matcher(versionKey); + if (matcher.find()) { + + int currentVersion = Integer.valueOf(matcher.group(1)); + + if ((latestVersion == null) || (currentVersion > latestVersion)) { + latestVersion = currentVersion; + latestVersionStr = versionKey; + } + } + } + + return latestVersionStr; + } + + /** + * Retrieves the map of all JAXB context objects that have been created by importing the available + * OXM model schemas. + * + * @return - Map of JAXB context objects. + */ + public static Map getVersionContextMap() { + return versionContextMap; + } + + /** + * Assigns the map of all JAXB context objects. + * + * @param versionContextMap + */ + public static void setVersionContextMap(Map versionContextMap) { + OXMModelLoader.versionContextMap = versionContextMap; + } + + /** + * Retrieves the list of all Loaded OXM versions. + * + * @return - A List of Strings of all loaded OXM versions. + * + * @throws SpikeException + */ + public static List getLoadedOXMVersions() throws SpikeException { + // If we haven't already loaded in the available OXM models, then do so now. + if (versionContextMap == null || versionContextMap.isEmpty()) { + loadModels(); + } + // If there are still no models available, then there's not much we can do... + if (versionContextMap.isEmpty()) { + logger.error(SpikeMsgs.OXM_LOAD_ERROR, "No available OXM schemas to get versions for."); + throw new SpikeException("No available OXM schemas to get latest version for."); + } + List versions = new ArrayList(); + for (String versionKey : versionContextMap.keySet()) { + Matcher matcher = versionPattern.matcher(versionKey.toUpperCase()); + if (matcher.find()) { + versions.add("V" + matcher.group(1)); + } + } + return versions; + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java b/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java new file mode 100644 index 0000000..51ed93e --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/OxmConfigTranslator.java @@ -0,0 +1,98 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.PathMatcher; +import java.nio.file.Paths; +import java.util.List; +import java.util.Map; +import java.util.ServiceConfigurationError; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.onap.aai.setup.ConfigTranslator; +import org.onap.aai.setup.SchemaLocationsBean; +import org.onap.aai.setup.Version; + +public class OxmConfigTranslator extends ConfigTranslator { + public OxmConfigTranslator(SchemaLocationsBean bean) { + super(bean); + } + + @Override + public Map> getNodeFiles() { + String nodeDirectory = bean.getNodeDirectory(); + if (nodeDirectory == null) { + throw new ServiceConfigurationError( + "Node(s) directory is empty in the schema location bean (" + bean.getSchemaConfigLocation() + ")"); + } + try { + return getVersionMap(Paths.get(nodeDirectory), "*_v*.xml"); + } catch (IOException e) { + throw new ServiceConfigurationError("Failed to read node(s) directory " + getPath(nodeDirectory), e); + } + } + + @Override + public Map> getEdgeFiles() { + String edgeDirectory = bean.getEdgeDirectory(); + if (edgeDirectory == null) { + throw new ServiceConfigurationError( + "Edge(s) directory is empty in the schema location bean (" + bean.getSchemaConfigLocation() + ")"); + } + try { + return getVersionMap(Paths.get(edgeDirectory), "*_v*.json"); + } catch (IOException e) { + throw new ServiceConfigurationError("Failed to read edge(s) directory " + getPath(edgeDirectory), e); + } + } + + private String getPath(String nodeDirectory) { + return Paths.get(nodeDirectory).toAbsolutePath().toString(); + } + + /** + * Creates a map containing each OXM Version and the matching OXM file path(s) + * + * @param folderPath the folder/directory containing the OXM files + * @param fileSuffix + * @return a new Map object (may be empty) + * @throws IOException if there is a problem reading the specified directory path + */ + private Map> getVersionMap(Path folderPath, String globPattern) throws IOException { + final PathMatcher filter = folderPath.getFileSystem().getPathMatcher("glob:**/" + globPattern); + try (final Stream stream = Files.list(folderPath)) { + return stream.filter(filter::matches).map(Path::toString).filter(p -> getVersionFromPath(p) != null) + .collect(Collectors.groupingBy(this::getVersionFromPath)); + } + } + + private Version getVersionFromPath(String pathName) { + String version = "V" + pathName.replaceAll("^.*\\/", "").replaceAll("\\D+", ""); + try { + return Version.valueOf(version); + } catch (IllegalArgumentException e) { + return null; + } + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/Relationship.java b/src/main/java/org/onap/aai/spike/schema/Relationship.java new file mode 100644 index 0000000..af81a29 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/Relationship.java @@ -0,0 +1,102 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.schema; + +import java.util.HashMap; +import java.util.Map; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter; +import com.google.gson.annotations.SerializedName; + + +/** + * This class represents a relationship instance that can be used for marshalling and unmarshalling + * to/from a model compliant representation. + */ +@XmlRootElement +public class Relationship { + + /** Unique identifier for this relationship. */ + private String id; + + /** Relationship type. */ + private String type; + + /** The source vertex for this edge. */ + @SerializedName("source-node-type") + private String sourceNodeType; + + /** The target vertex for this edge. */ + @SerializedName("target-node-type") + private String targetNodeType; + + /** The properties assigned to this edge. */ + private Map properties = new HashMap(); + + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getSourceNodeType() { + return sourceNodeType; + } + + @XmlElement(name = "source-node-type") + public void setSourceNodeType(String sourceNodeType) { + this.sourceNodeType = sourceNodeType; + } + + public String getTargetNodeType() { + return targetNodeType; + } + + @XmlElement(name = "target-node-type") + public void setTargetNodeType(String targetNodeType) { + this.targetNodeType = targetNodeType; + } + + @XmlJavaTypeAdapter(MapAdapter.class) + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + + public void setProperty(String key, Object property) { + properties.put(key, property); + } +} diff --git a/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java b/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java new file mode 100644 index 0000000..6858783 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/schema/RelationshipSchema.java @@ -0,0 +1,133 @@ +/** + * ============LICENSE_START======================================================= + * Gizmo + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.aai.spike.schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import com.google.common.collect.Multimap; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.codehaus.jackson.map.ObjectMapper; +import org.onap.aai.edges.EdgeRule; +import org.onap.aai.spike.exception.SpikeException; + + +public class RelationshipSchema { + private static final Gson gson = new GsonBuilder().create(); + + public static final String SCHEMA_SOURCE_NODE_TYPE = "from"; + public static final String SCHEMA_TARGET_NODE_TYPE = "to"; + public static final String SCHEMA_RELATIONSHIP_TYPE = "label"; + public static final String SCHEMA_RULES_ARRAY = "rules"; + + + private Map>> relations = new HashMap<>(); + /** + * Hashmap of valid relationship types along with properties. + */ + private Map>> relationTypes = new HashMap<>(); + + + public RelationshipSchema(Multimap rules, String props) throws SpikeException, IOException { + HashMap properties = new ObjectMapper().readValue(props, HashMap.class); + Map> edgeProps = + properties.entrySet().stream().collect(Collectors.toMap(p -> p.getKey(), p -> { + try { + return resolveClass(p.getValue()); + } catch (SpikeException | ClassNotFoundException e) { + e.printStackTrace(); + } + return null; + })); + + rules.entries().forEach((kv) -> { + relationTypes.put(kv.getValue().getLabel(), edgeProps); + relations.put(buildRelation(kv.getValue().getFrom(), kv.getValue().getTo(), kv.getValue().getLabel()), + edgeProps); + }); + } + + public RelationshipSchema(List jsonStrings) throws SpikeException, IOException { + String edgeRules = jsonStrings.get(0); + String props = jsonStrings.get(1); + + HashMap>> rules = + new ObjectMapper().readValue(edgeRules, HashMap.class); + HashMap properties = new ObjectMapper().readValue(props, HashMap.class); + Map> edgeProps = + properties.entrySet().stream().collect(Collectors.toMap(p -> p.getKey(), p -> { + try { + return resolveClass(p.getValue()); + } catch (SpikeException | ClassNotFoundException e) { + e.printStackTrace(); + } + return null; + })); + + rules.get(SCHEMA_RULES_ARRAY).forEach(l -> { + relationTypes.put(l.get(SCHEMA_RELATIONSHIP_TYPE), edgeProps); + relations.put(buildRelation(l.get(SCHEMA_SOURCE_NODE_TYPE), l.get(SCHEMA_TARGET_NODE_TYPE), + l.get(SCHEMA_RELATIONSHIP_TYPE)), edgeProps); + }); + } + + + + public Map> lookupRelation(String key) { + return this.relations.get(key); + } + + public Map> lookupRelationType(String type) { + return this.relationTypes.get(type); + } + + public boolean isValidType(String type) { + return relationTypes.containsKey(type); + } + + + private String buildRelation(String source, String target, String relation) { + return source + ":" + target + ":" + relation; + } + + private Class resolveClass(String type) throws SpikeException, ClassNotFoundException { + Class clazz = Class.forName(type); + validateClassTypes(clazz); + return clazz; + } + + private void validateClassTypes(Class clazz) throws SpikeException { + if (!clazz.isAssignableFrom(Integer.class) && !clazz.isAssignableFrom(Double.class) + && !clazz.isAssignableFrom(Boolean.class) && !clazz.isAssignableFrom(String.class)) { + throw new SpikeException("BAD_REQUEST"); + } + } +} + + diff --git a/src/main/java/org/onap/aai/spike/service/EchoService.java b/src/main/java/org/onap/aai/spike/service/EchoService.java new file mode 100644 index 0000000..6c5b5ee --- /dev/null +++ b/src/main/java/org/onap/aai/spike/service/EchoService.java @@ -0,0 +1,85 @@ +/** + * ============LICENSE_START======================================================= + * Spike + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.onap.aai.spike.service; + +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.core.Response.Status; +import org.onap.aai.cl.api.LogFields; +import org.onap.aai.cl.api.LogLine; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.cl.mdc.MdcContext; +import org.onap.aai.spike.logging.SpikeMsgs; +import org.onap.aai.spike.util.SpikeConstants; +import org.slf4j.MDC; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestHeader; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping(value = "/services/spike/v1/echo-service") +public class EchoService { + + private static Logger logger = LoggerFactory.getInstance().getLogger(EchoService.class.getName()); + private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(EchoService.class.getName()); + + @GetMapping("/echo") + public ResponseEntity ping(@RequestHeader HttpHeaders headers, HttpServletRequest req) { + + String fromIp = req.getRemoteAddr(); + String fromAppId = ""; + String transId = null; + + if (headers.getFirst("X-FromAppId") != null) { + fromAppId = headers.getFirst("X-FromAppId"); + } + + if ((headers.getFirst("X-TransactionId") == null) || headers.getFirst("X-TransactionId").isEmpty()) { + transId = java.util.UUID.randomUUID().toString(); + } else { + transId = headers.getFirst("X-TransactionId"); + } + + MdcContext.initialize(transId, SpikeConstants.SPIKE_SERVICE_NAME, "", fromAppId, fromIp); + + // Generate error log + logger.info(SpikeMsgs.PROCESS_REST_REQUEST, req.getMethod(), req.getRequestURL().toString(), + req.getRemoteHost(), Status.OK.toString()); + + // Generate audit log. + auditLogger.info(SpikeMsgs.PROCESS_REST_REQUEST, + new LogFields().setField(LogLine.DefinedFields.RESPONSE_CODE, Status.OK.toString()) + .setField(LogLine.DefinedFields.RESPONSE_DESCRIPTION, Status.OK.toString()), + (req != null) ? req.getMethod() : "Unknown", (req != null) ? req.getRequestURL().toString() : "Unknown", + (req != null) ? req.getRemoteHost() : "Unknown", Status.OK.toString()); + MDC.clear(); + + return new ResponseEntity<>("Alive", HttpStatus.OK); + } +} diff --git a/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java new file mode 100644 index 0000000..c4e1746 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/service/SpikeEventProcessor.java @@ -0,0 +1,298 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.service; + +import java.util.ArrayList; +import java.util.TimerTask; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.PriorityBlockingQueue; +import javax.naming.OperationNotSupportedException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; +import org.onap.aai.event.api.MessageWithOffset; +import org.onap.aai.spike.event.envelope.EventEnvelope; +import org.onap.aai.spike.event.envelope.EventEnvelopeParser; +import org.onap.aai.spike.event.incoming.GizmoGraphEvent; +import org.onap.aai.spike.event.incoming.OffsetManager; +import org.onap.aai.spike.event.outgoing.SpikeEventComparator; +import org.onap.aai.spike.event.outgoing.SpikeEventExclusionStrategy; +import org.onap.aai.spike.event.outgoing.SpikeGraphEvent; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.logging.SpikeMsgs; +import org.onap.aai.spike.util.SpikeConstants; +import org.onap.aai.spike.util.SpikeProperties; + +public class SpikeEventProcessor extends TimerTask { + + /** + * Client used for consuming events to the event bus. + */ + private EventConsumer consumer; + /** + * Client used for publishing events to the event bus. + */ + private EventPublisher publisher; + /** + * Internal queue where outgoing events will be buffered until they can be serviced by the event + * publisher worker threads. + */ + private BlockingQueue eventQueue; + + private Integer eventQueueCapacity = DEFAULT_EVENT_QUEUE_CAPACITY; + private Integer eventOffsetPeriod = DEFAULT_EVENT_OFFSET_COMMIT_PERIOD; + + private OffsetManager offsetManager; + private Long lastCommittedOffset = null; + private EventEnvelopeParser eventEnvelopeParser; + + /** + * Number of events that can be queued up for publishing before it is dropped + */ + private static final Integer DEFAULT_EVENT_QUEUE_CAPACITY = 10000; + private static final Integer DEFAULT_EVENT_OFFSET_COMMIT_PERIOD = 10000; + + private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeEventProcessor.class.getName()); + private static Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(SpikeEventProcessor.class.getName()); + private static final Gson gson = + new GsonBuilder().setExclusionStrategies(new SpikeEventExclusionStrategy()).setPrettyPrinting().create(); + + public SpikeEventProcessor(EventConsumer consumer, EventPublisher publisher) { + this.consumer = consumer; + this.publisher = publisher; + + try { + eventQueueCapacity = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_CAPACITY)); + eventOffsetPeriod = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_OFFSET_CHECK_PERIOD)); + + } catch (Exception ex) { + } + + eventQueue = new PriorityBlockingQueue(eventQueueCapacity, new SpikeEventComparator()); + new Thread(new SpikeEventPublisher()).start(); + + // Instantiate the offset manager. This will run a background thread that + // periodically updates the value of the most recent offset value that can + // be safely committed with the event bus. + offsetManager = new OffsetManager(eventQueueCapacity, eventOffsetPeriod); + eventEnvelopeParser = new EventEnvelopeParser(); + } + + @Override + public void run() { + logger.info(SpikeMsgs.SPIKE_QUERY_EVENT_SYSTEM); + + if (consumer == null) { + logger.error(SpikeMsgs.SPIKE_SERVICE_STARTED_FAILURE, SpikeConstants.SPIKE_SERVICE_NAME); + } + + Iterable events = null; + try { + events = consumer.consumeWithOffsets(); + + } catch (OperationNotSupportedException e) { + // This means we are using DMaaP and can't use offsets + try { + Iterable tempEvents = consumer.consume(); + ArrayList messages = new ArrayList(); + for (String event : tempEvents) { + messages.add(new MessageWithOffset(0, event)); + } + events = messages; + } catch (Exception e1) { + logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e1.getMessage()); + return; + } + } catch (Exception e) { + logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage()); + return; + } + + if (events == null || !events.iterator().hasNext()) { + logger.info(SpikeMsgs.SPIKE_NO_EVENT_RECEIVED); + } + + for (MessageWithOffset event : events) { + try { + logger.debug(SpikeMsgs.SPIKE_EVENT_RECEIVED, event.getMessage()); + + GizmoGraphEvent modelEvent = eventEnvelopeParser.parseEvent(event.getMessage()); + auditLogger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, + "of type: " + modelEvent.getObjectType() + " with key: " + modelEvent.getObjectKey() + + " , transaction-id: " + modelEvent.getTransactionId()); + logger.info(SpikeMsgs.SPIKE_EVENT_RECEIVED, "of type: " + modelEvent.getObjectType() + " with key: " + + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId()); + + String modelEventJson = gson.toJson(modelEvent); + + // Log the current event as 'being processed' with the offset manager so that we know that it's + // associated offset is not yet save to be committed as 'done'. + offsetManager.cacheEvent(modelEvent.getTransactionId(), event.getOffset()); + + while (eventQueue.size() >= eventQueueCapacity) { + // Wait until there's room in the queue + logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, + "Event could not be published to the event bus due to: Internal buffer capacity exceeded. Waiting 10 seconds."); + Thread.sleep(10000); + } + + eventQueue.offer(modelEvent.toSpikeGraphEvent()); + + logger.info(SpikeMsgs.SPIKE_EVENT_PROCESSED, "of type: " + modelEvent.getObjectType() + " with key: " + + modelEvent.getObjectKey() + " , transaction-id: " + modelEvent.getTransactionId()); + logger.debug(SpikeMsgs.SPIKE_EVENT_PROCESSED, modelEventJson); + + } catch (SpikeException | InterruptedException e) { + logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, + e.getMessage() + ". Incoming event payload:\n" + event.getMessage()); + } catch (Exception e) { + logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, + e.getMessage() + ". Incoming event payload:\n" + event.getMessage()); + } + } + + try { + + // Get the next 'safe' offset to be committed from the offset manager. + // We need to do this here istead of letting the offset manager just take care + // of it for us because the event consumer is not thread safe. If we try to + // commit the offsets from another thread, it gets unhappy... + Long nextOffset = offsetManager.getNextOffsetToCommit(); + + // Make sure we actually have a real value... + if (nextOffset != null) { + + // There is no point in continually committing the same offset value, so make sure + // that something has actually changed before we do anything... + if ((lastCommittedOffset == null) || (lastCommittedOffset != nextOffset)) { + + if (logger.isDebugEnabled()) { + logger.debug( + "Committing offset: " + nextOffset + " to the event bus for Champ raw event topic."); + } + + // OK, let's commit the latest value... + consumer.commitOffsets(nextOffset); + lastCommittedOffset = nextOffset; + } + } + } catch (OperationNotSupportedException e) { + // We must be working with a DMaap which doesn't support offset management. Swallow + // the exception + } catch (Exception e) { + logger.error(SpikeMsgs.SPIKE_EVENT_CONSUME_FAILURE, e.getMessage()); + } + } + + /** + * This class implements the threads which is responsible for buffering the events in memory and + * ordering them before publishing it to topic + *

+ * Each publish operation is performed synchronously, so that the thread will only move on to the + * next available event once it has actually published the current event to the bus. + */ + private class SpikeEventPublisher implements Runnable { + + /** + * Partition key to use when publishing events to the event stream. We WANT all events to go to a + * single partition, so we are just using a hard-coded key for every event. + */ + private static final String EVENTS_PARTITION_KEY = "SpikeEventKey"; + private static final int DEFAULT_EVENT_QUEUE_DELAY = 10000; + + Integer eventQueueDelay = DEFAULT_EVENT_QUEUE_DELAY; + + public SpikeEventPublisher() { + try { + eventQueueDelay = Integer.parseInt(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_QUEUE_DELAY)); + } catch (Exception ex) { + } + } + + @Override + public void run() { + while (true) { + + SpikeGraphEvent nextEvent; + SpikeGraphEvent event = null; + try { + + // Get the next event to be published from the queue if it is old enough or we have too + // many items in the queue + if ((nextEvent = eventQueue.peek()) != null + && (System.currentTimeMillis() - nextEvent.getSpikeTimestamp() > eventQueueDelay + || eventQueue.size() > eventQueueCapacity)) { + event = eventQueue.take(); + } else { + continue; + } + + } catch (InterruptedException e) { + + // Restore the interrupted status. + Thread.currentThread().interrupt(); + } + + // Try publishing the event to the event bus. This call will block + // until the event is published or times out. + try { + String eventJson = gson.toJson(new EventEnvelope(event)); + int sentMessageCount = publisher.sendSync(EVENTS_PARTITION_KEY, eventJson); + if (sentMessageCount > 0) { + logger.info(SpikeMsgs.SPIKE_EVENT_PUBLISHED, "of type: " + event.getObjectType() + " with key: " + + event.getObjectKey() + " , transaction-id: " + event.getTransactionId()); + logger.debug(SpikeMsgs.SPIKE_EVENT_PUBLISHED, eventJson); + } else { + logger.warn(SpikeMsgs.SPIKE_PUBLISH_FAILED, "of type: " + event.getObjectType() + " with key: " + + event.getObjectKey() + " , transaction-id: " + event.getTransactionId()); + logger.debug(SpikeMsgs.SPIKE_PUBLISH_FAILED, eventJson); + } + + + // Inform the offset manager that this event has been published. It's offset + // can now, potentially, be safely committed to the event bus so that on a + // restart we won't reprocess it. + offsetManager.markAsPublished(event.getTransactionId()); + + } catch (ExecutionException e) { + + // Publish timed out, queue it up to retry again. Since this message was pulled from the + // top of the queue, it will go back to the top. + logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, "Retrying in 60 seconds. " + e.getMessage()); + eventQueue.offer(event); + + try { + Thread.sleep(60000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } catch (Exception e) { + logger.error(SpikeMsgs.SPIKE_EVENT_PUBLISH_FAILURE, e.getMessage()); + } + } + } + } + +} diff --git a/src/main/java/org/onap/aai/spike/service/SpikeService.java b/src/main/java/org/onap/aai/spike/service/SpikeService.java new file mode 100644 index 0000000..3aa6dfe --- /dev/null +++ b/src/main/java/org/onap/aai/spike/service/SpikeService.java @@ -0,0 +1,75 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.service; + +import java.util.Timer; +import javax.annotation.PreDestroy; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; +import org.onap.aai.spike.logging.SpikeMsgs; +import org.onap.aai.spike.schema.EdgeRulesLoader; +import org.onap.aai.spike.schema.OXMModelLoader; +import org.onap.aai.spike.util.SpikeConstants; +import org.onap.aai.spike.util.SpikeProperties; + +public class SpikeService { + + private EventConsumer consumer; + private EventPublisher publisher; + private static Logger logger = LoggerFactory.getInstance().getLogger(SpikeService.class.getName()); + private Timer timer; + + public SpikeService(EventConsumer consumer, EventPublisher publisher) { + this.consumer = consumer; + this.publisher = publisher; + } + + + public void startup() throws Exception { + + // Load models + EdgeRulesLoader.loadModels(); + OXMModelLoader.loadModels(); + + Long interval = 30000L; + try { + interval = Long.parseLong(SpikeProperties.get(SpikeConstants.SPIKE_EVENT_POLL_INTERVAL)); + } catch (Exception ex) { + } + + SpikeEventProcessor processEvent = new SpikeEventProcessor(consumer, publisher); + timer = new Timer("spike-consumer"); + timer.schedule(processEvent, interval, interval); + + logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName()); + } + + @PreDestroy + protected void preShutdown() { + logger.info(SpikeMsgs.SPIKE_SERVICE_STARTED_SUCCESSFULLY, consumer.getClass().getName()); + timer.cancel(); + + + } + +} diff --git a/src/main/java/org/onap/aai/spike/util/FileWatcher.java b/src/main/java/org/onap/aai/spike/util/FileWatcher.java new file mode 100644 index 0000000..6bbb228 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/util/FileWatcher.java @@ -0,0 +1,45 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.util; + +import java.io.File; +import java.util.TimerTask; + +public abstract class FileWatcher extends TimerTask { + private long timeStamp; + private File file; + + public FileWatcher(File file) { + this.file = file; + this.timeStamp = file.lastModified(); + } + + public final void run() { + long timeStamp = file.lastModified(); + + if ((timeStamp - this.timeStamp) > 500) { + this.timeStamp = timeStamp; + onChange(file); + } + } + + protected abstract void onChange(File file); +} diff --git a/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java b/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java new file mode 100644 index 0000000..48e26c8 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/util/SchemaIngestPropertiesReader.java @@ -0,0 +1,116 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.util; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.nio.file.NoSuchFileException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Properties; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.aai.spike.exception.SpikeException; +import org.onap.aai.spike.logging.SpikeMsgs; + +public class SchemaIngestPropertiesReader { + + private static final String SCHEMA_INGEST_PROPERTIES_LOCATION = + System.getProperty("CONFIG_HOME") + "/schemaIngest.properties"; + + private static org.onap.aai.cl.api.Logger logger = + LoggerFactory.getInstance().getLogger(SchemaIngestPropertiesReader.class.getName()); + + /** + * Gets the Node directory location to ingest OXMs + * + * @return path to OXMs + * @throws SpikeException + */ + public String getNodeDir() throws SpikeException { + Properties prop = findSchemaIngestPropertyFile(); + return prop.getProperty("nodeDir"); + } + + /** + * Gets the Edge directory location to ingest OXM + * + * @return path to OXMs + * @throws SpikeException + */ + public String getEdgeDir() throws SpikeException { + Properties prop = findSchemaIngestPropertyFile(); + return prop.getProperty("edgeDir"); + } + + /** + * Gets the location of the Edge Properties + * + * @return + * @throws SpikeException + */ + public String getEdgePropsDir() throws SpikeException { + + Properties prop = findSchemaIngestPropertyFile(); + return prop.getProperty("edgePropsDir"); + } + + + private Properties findSchemaIngestPropertyFile() throws SpikeException { + Properties prop = new Properties(); + try { + prop = loadFromFile(SCHEMA_INGEST_PROPERTIES_LOCATION); + } catch (NoSuchFileException e) { + // if file not found, try via classpath + try { + prop = loadFromClasspath("schemaIngest.properties"); + } catch (URISyntaxException | IOException e1) { + logger.error(SpikeMsgs.SPIKE_SCHEMA_INGEST_LOAD_ERROR, e1.getMessage()); + throw new SpikeException("Failed to load schemaIngest.properties", e1); + } + } catch (IOException e) { + logger.error(SpikeMsgs.SPIKE_SCHEMA_INGEST_LOAD_ERROR, e.getMessage()); + throw new SpikeException("Failed to load schemaIngest.properties", e); + } + return prop; + } + + private Properties loadFromFile(String filename) throws IOException { + Path configLocation = Paths.get(filename); + try (InputStream stream = Files.newInputStream(configLocation)) { + return loadProperties(stream); + } + } + + private Properties loadFromClasspath(String resourceName) throws URISyntaxException, IOException { + Path path = Paths.get(ClassLoader.getSystemResource(resourceName).toURI()); + try (InputStream stream = Files.newInputStream(path)) { + return loadProperties(stream); + } + } + + private Properties loadProperties(InputStream stream) throws IOException { + Properties config = new Properties(); + config.load(stream); + return config; + } +} diff --git a/src/main/java/org/onap/aai/spike/util/SpikeConstants.java b/src/main/java/org/onap/aai/spike/util/SpikeConstants.java new file mode 100644 index 0000000..95087e8 --- /dev/null +++ b/src/main/java/org/onap/aai/spike/util/SpikeConstants.java @@ -0,0 +1,43 @@ +/** + * ============LICENSE_START======================================================= + * Spike + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP and OpenECOMP are trademarks + * and service marks of AT&T Intellectual Property. + */ +package org.onap.aai.spike.util; + +public class SpikeConstants { + // Logging related + public static final String SPIKE_SERVICE_NAME = "spike"; + + public static final String SPIKE_FILESEP = + (System.getProperty("file.separator") == null) ? "/" : System.getProperty("file.separator"); + + public static final String SPIKE_SPECIFIC_CONFIG = System.getProperty("CONFIG_HOME") + SPIKE_FILESEP; + + public static final String SPIKE_HOME_MODEL = SPIKE_SPECIFIC_CONFIG + "model" + SPIKE_FILESEP; + public static final String SPIKE_EVENT_POLL_INTERVAL = "spike.event.poll.interval"; + public static final String SPIKE_EVENT_QUEUE_CAPACITY = "spike.event.queue.capacity"; + public static final String SPIKE_EVENT_QUEUE_DELAY = "spike.event.queue.delay"; + public static final String SPIKE_EVENT_OFFSET_CHECK_PERIOD = "spike.event.offset.period"; + public static final String SPIKE_PROPS_RESERVED = "spike.props.reserved"; + public static final String SPIKE_CONFIG_FILE = SPIKE_SPECIFIC_CONFIG + "spike.properties"; +} diff --git a/src/main/java/org/onap/aai/spike/util/SpikeProperties.java b/src/main/java/org/onap/aai/spike/util/SpikeProperties.java new file mode 100644 index 0000000..ac95efb --- /dev/null +++ b/src/main/java/org/onap/aai/spike/util/SpikeProperties.java @@ -0,0 +1,52 @@ +/** + * ============LICENSE_START======================================================= + * org.onap.aai + * ================================================================================ + * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved. + * Copyright © 2017-2018 Amdocs + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.aai.spike.util; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Properties; + +public class SpikeProperties { + + private static Properties properties; + + static { + properties = new Properties(); + File file = new File(SpikeConstants.SPIKE_CONFIG_FILE); + try { + properties.load(new FileInputStream(file)); + } catch (FileNotFoundException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public static String get(String key) { + return properties.getProperty(key); + } + + public static String get(String key, String defaultValue) { + return properties.getProperty(key, defaultValue); + } +} -- cgit 1.2.3-korg