summaryrefslogtreecommitdiffstats
path: root/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org')
-rw-r--r--src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java130
-rw-r--r--src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java197
-rw-r--r--src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java142
-rw-r--r--src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java56
-rw-r--r--src/main/java/org/onap/crud/exception/CrudException.java46
-rw-r--r--src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java654
-rw-r--r--src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java6
7 files changed, 827 insertions, 404 deletions
diff --git a/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java b/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java
new file mode 100644
index 0000000..13370a2
--- /dev/null
+++ b/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java
@@ -0,0 +1,130 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.crud.event.envelope;
+
+import javax.ws.rs.core.Response;
+import org.onap.crud.event.GraphEvent;
+import org.onap.crud.event.GraphEventEdge;
+import org.onap.crud.event.GraphEventVertex;
+import org.onap.crud.exception.CrudException;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+
+/**
+ * Defines the top-level structure of the Gizmo events. Primarily serves as a wrapper for the graph details and includes
+ * an event header to describe and identify the event.
+ */
+public class GraphEventEnvelope {
+
+ private GraphEventHeader header;
+ private GraphEvent body;
+ private JsonElement policyViolations;
+
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ /**
+ * Construct the event.
+ *
+ * @param header the event header to describe the event
+ * @param body the body of the event with graph details
+ */
+ public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) {
+ this.header = header;
+ this.body = body;
+ }
+
+ /**
+ * Construct the envelope header from the provided GraphEvent.
+ *
+ * @param event the graph event for a vertex or edge operation
+ */
+ public GraphEventEnvelope(GraphEvent event) {
+ this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId())
+ .validationEntityType(getType(event)).build();
+ this.body = event;
+ }
+
+ public GraphEvent getBody() {
+ return body;
+ }
+
+ public GraphEventHeader getHeader() {
+ return header;
+ }
+
+ public JsonElement getPolicyViolations() {
+ return policyViolations != null ? policyViolations : new JsonArray();
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ /**
+ * Deserializes a JSON string into a GraphEventEnvelope object.
+ *
+ * @param json the JSON string
+ * @return a GraphEventEnvelope object
+ * @throws CrudException
+ */
+ public static GraphEventEnvelope fromJson(String json) throws CrudException {
+ try {
+ if (json == null || json.isEmpty()) {
+ throw new CrudException("Empty or null JSON string.", Response.Status.BAD_REQUEST);
+ }
+ return gson.fromJson(json, GraphEventEnvelope.class);
+ } catch (Exception ex) {
+ throw new CrudException("Unable to parse JSON string: " + json, Response.Status.BAD_REQUEST);
+ }
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+
+ private String getType(GraphEvent event) {
+ GraphEventVertex vertex = event.getVertex();
+ GraphEventEdge edge = event.getEdge();
+ if (vertex != null) {
+ return vertex.getType();
+ } else if (edge != null) {
+ return edge.getType();
+ }
+ return null;
+ }
+
+}
diff --git a/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java b/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java
new file mode 100644
index 0000000..f70127d
--- /dev/null
+++ b/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java
@@ -0,0 +1,197 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.crud.event.envelope;
+
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Objects;
+import java.util.UUID;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.annotations.SerializedName;
+
+public class GraphEventHeader {
+
+ @SerializedName("request-id")
+ private String requestId;
+
+ private String timestamp;
+
+ @SerializedName("source-name")
+ private String sourceName;
+
+ @SerializedName("event-type")
+ private String eventType;
+
+ @SerializedName("validation-entity-type")
+ private String validationEntityType;
+
+ @SerializedName("validation-top-entity-type")
+ private String validationTopEntityType;
+
+ @SerializedName("entity-link")
+ private String entityLink;
+
+ private static final String MICROSERVICE_NAME = "GIZMO";
+ private static final String EVENT_TYPE_PENDING_UPDATE = "pending-update";
+ private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create();
+
+ public static class Builder {
+
+ private String requestId;
+ private String validationEntityType;
+ private String validationTopEntityType;
+ private String entityLink;
+
+ public Builder requestId(String val) {
+ requestId = val;
+ return this;
+ }
+
+ public Builder validationEntityType(String val) {
+ validationEntityType = val;
+ return this;
+ }
+
+ public Builder validationTopEntityType(String val) {
+ validationTopEntityType = val;
+ return this;
+ }
+
+ public Builder entityLink(String val) {
+ entityLink = val;
+ return this;
+ }
+
+ public GraphEventHeader build() {
+ return new GraphEventHeader(this);
+ }
+ }
+
+ private GraphEventHeader(Builder builder) {
+ requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString();
+ timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now());
+ sourceName = MICROSERVICE_NAME;
+ eventType = EVENT_TYPE_PENDING_UPDATE;
+
+ validationEntityType = builder.validationEntityType;
+ validationTopEntityType = builder.validationTopEntityType;
+ entityLink = builder.entityLink;
+ }
+
+ /**
+ * Serializes this object into a JSON string representation.
+ *
+ * @return a JSON format string representation of this object.
+ */
+ public String toJson() {
+ return gson.toJson(this);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // GETTERS
+ ///////////////////////////////////////////////////////////////////////////
+
+ public String getRequestId() {
+ return requestId;
+ }
+
+ public String getTimestamp() {
+ return timestamp;
+ }
+
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public String getEventType() {
+ return eventType;
+ }
+
+ public String getValidationEntityType() {
+ return validationEntityType;
+ }
+
+ public String getValidationTopEntityType() {
+ return validationTopEntityType;
+ }
+
+ public String getEntityLink() {
+ return entityLink;
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // OVERRIDES
+ ///////////////////////////////////////////////////////////////////////////
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#hashCode()
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType,
+ this.validationTopEntityType, this.entityLink);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof GraphEventHeader)) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ }
+ GraphEventHeader rhs = (GraphEventHeader) obj;
+ // @formatter:off
+ return new EqualsBuilder()
+ .append(requestId, rhs.requestId)
+ .append(timestamp, rhs.timestamp)
+ .append(sourceName, rhs.sourceName)
+ .append(eventType, rhs.sourceName)
+ .append(validationEntityType, rhs.validationEntityType)
+ .append(validationTopEntityType, rhs.validationTopEntityType)
+ .append(entityLink, rhs.entityLink)
+ .isEquals();
+ // @formatter:on
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return this.toJson();
+ }
+
+}
diff --git a/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java b/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java
new file mode 100644
index 0000000..1fa056e
--- /dev/null
+++ b/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java
@@ -0,0 +1,142 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.crud.event.response;
+
+import javax.ws.rs.core.Response.Status;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.crud.event.GraphEvent;
+import org.onap.crud.event.GraphEvent.GraphEventResult;
+import org.onap.crud.event.envelope.GraphEventEnvelope;
+import org.onap.crud.exception.CrudException;
+import org.onap.crud.logging.CrudServiceMsgs;
+import org.onap.crud.parser.CrudResponseBuilder;
+import org.onap.schema.OxmModelValidator;
+import org.onap.schema.RelationshipSchemaValidator;
+
+/**
+ * Reads event responses, logs and generates exceptions if errors are found.
+ *
+ */
+public class GraphEventResponseHandler {
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(GraphEventResponseHandler.class.getName());
+
+ public String handleVertexResponse(String version, GraphEvent event, GraphEventEnvelope response)
+ throws CrudException {
+ handlePolicyViolations(event, response);
+ logResponse(event, response.getBody());
+
+ if (isErrorResponse(response.getBody())) {
+ throwOperationException(response);
+ }
+
+ return CrudResponseBuilder.buildUpsertVertexResponse(
+ OxmModelValidator.validateOutgoingPayload(version, response.getBody().getVertex().toVertex()), version);
+ }
+
+ public String handleEdgeResponse(String version, GraphEvent event, GraphEventEnvelope response)
+ throws CrudException {
+ handlePolicyViolations(event, response);
+ logResponse(event, response.getBody());
+
+ if (isErrorResponse(response.getBody())) {
+ throwOperationException(response);
+ }
+
+ return CrudResponseBuilder.buildUpsertEdgeResponse(
+ RelationshipSchemaValidator.validateOutgoingPayload(version, response.getBody().getEdge().toEdge()),
+ version);
+ }
+
+ public String handleDeletionResponse(GraphEvent event, GraphEventEnvelope response) throws CrudException {
+ handlePolicyViolations(event, response);
+ logResponse(event, response.getBody());
+
+ if (isErrorResponse(response.getBody())) {
+ throwOperationException(response);
+ }
+
+ return "";
+ }
+
+ public void handleBulkEventResponse(GraphEvent event, GraphEventEnvelope response) throws CrudException {
+ handlePolicyViolations(event, response);
+ logResponse(event, response.getBody());
+
+ if (isErrorResponse(response.getBody())) {
+ throwOperationException(response);
+ }
+ }
+
+ public boolean hasPolicyViolations(GraphEventEnvelope event) {
+ return event.getPolicyViolations() != null && event.getPolicyViolations().isJsonArray()
+ && event.getPolicyViolations().getAsJsonArray().size() != 0;
+ }
+
+ private void handlePolicyViolations(GraphEvent event, GraphEventEnvelope response) throws CrudException {
+ if (hasPolicyViolations(response)) {
+ logPolicyViolation(event, response);
+ throw new CrudException(GraphEventResponseMessage.POLICY_VIOLATION_EXCEPTION_MESSAGE.getMessage(
+ response.getBody().getTransactionId(), response.getPolicyViolations()), Status.BAD_REQUEST);
+ }
+ }
+
+ private void logResponse(GraphEvent event, GraphEvent response) {
+ String message = GraphEventResponseMessage.BASE_OPERATION_LOG_MESSAGE.getMessage(response.getObjectType(),
+ response.getObjectKey(), response.getTransactionId(), event.getOperation().toString(),
+ response.getResult());
+ if (isErrorResponse(response)) {
+ message = GraphEventResponseMessage.OPERATION_ERROR_LOG_MESSAGE.getMessage(message,
+ response.getErrorMessage());
+ }
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, message);
+ }
+
+ private void logPolicyViolation(GraphEvent event, GraphEventEnvelope response) {
+ //@formatter:off
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ GraphEventResponseMessage.POLICY_VIOLATION_LOG_MESSAGE.getMessage(
+ response.getBody().getTransactionId(),
+ response.getHeader().getSourceName(),
+ response.getHeader().getEventType(),
+ response.getBody().getObjectKey(),
+ response.getBody().getObjectType(),
+ event.getOperation().toString(),
+ response.getPolicyViolations().toString()));
+ //@formatter:on
+ }
+
+ private void throwOperationException(GraphEventEnvelope response) throws CrudException {
+ throw new CrudException(
+ GraphEventResponseMessage.OPERATION_ERROR_EXCEPTION_MESSAGE
+ .getMessage(response.getBody().getTransactionId(), response.getBody().getErrorMessage()),
+ response.getBody().getHttpErrorStatus());
+ }
+
+ private boolean isErrorResponse(GraphEvent response) {
+ return GraphEventResult.FAILURE.equals(response.getResult());
+ }
+}
diff --git a/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java b/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java
new file mode 100644
index 0000000..5e4c6bd
--- /dev/null
+++ b/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java
@@ -0,0 +1,56 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.crud.event.response;
+
+import java.text.MessageFormat;
+
+/**
+ * Graph event handling messages, including logging and exceptions.
+ */
+public enum GraphEventResponseMessage {
+
+ //@formatter:off
+ BASE_OPERATION_LOG_MESSAGE("Event response received: {0} with key: {1}, transaction-id: {2}, operation: {3}, result: {4}"),
+ OPERATION_ERROR_LOG_MESSAGE("{0}, error: {1}"),
+ OPERATION_ERROR_EXCEPTION_MESSAGE("Operation Failed with transaction-id: {0}. Error: {1}"),
+ POLICY_VIOLATION_LOG_MESSAGE("Event response received: transaction-id: {0}, event source: {1}, event type: {2}, object key: {3}, object type: {4}, operation: {5}, result: policy violations detected in request, error: {6}"),
+ POLICY_VIOLATION_EXCEPTION_MESSAGE("Operation Failed with transaction-id {0}. Error: policy violations detected in request, {1}");
+ //@formatter:on
+
+ private String message;
+
+ private GraphEventResponseMessage(String message) {
+ this.message = message;
+ }
+
+ /**
+ * @param args to be formatted
+ * @return the formatted error message
+ */
+ public String getMessage(Object... args) {
+ MessageFormat formatter = new MessageFormat("");
+ formatter.applyPattern(this.message);
+ return formatter.format(args);
+ }
+}
diff --git a/src/main/java/org/onap/crud/exception/CrudException.java b/src/main/java/org/onap/crud/exception/CrudException.java
index 919af96..a4fb3d7 100644
--- a/src/main/java/org/onap/crud/exception/CrudException.java
+++ b/src/main/java/org/onap/crud/exception/CrudException.java
@@ -24,36 +24,34 @@ import javax.ws.rs.core.Response.Status;
public class CrudException extends Exception {
- private static final long serialVersionUID = 8162385108397238865L;
+ private static final long serialVersionUID = 8162385108397238865L;
- private Status httpStatus;
+ private Status httpStatus;
- public CrudException() {
- }
+ public CrudException() {}
- public CrudException(String message, Status httpStatus) {
- super(message);
- this.setHttpStatus(httpStatus);
- }
+ public CrudException(String message, Status httpStatus) {
+ super(message);
+ this.setHttpStatus(httpStatus);
+ }
- public CrudException(Throwable cause) {
- super(cause);
- }
+ public CrudException(Throwable cause) {
+ super(cause);
+ }
- public CrudException(String message, Throwable cause) {
- super(message, cause);
- }
+ public CrudException(String message, Throwable cause) {
+ super(message, cause);
+ }
- public CrudException(String message, Throwable cause, boolean enableSuppression,
- boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
+ public CrudException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
- public Status getHttpStatus() {
- return httpStatus;
- }
+ public Status getHttpStatus() {
+ return httpStatus;
+ }
- public void setHttpStatus(Status httpStatus) {
- this.httpStatus = httpStatus;
- }
+ public void setHttpStatus(Status httpStatus) {
+ this.httpStatus = httpStatus;
+ }
}
diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
index 592d4b3..dc30a4e 100644
--- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
+++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
@@ -20,434 +20,332 @@
*/
package org.onap.crud.service;
-import org.onap.aai.event.api.EventConsumer;
-import org.onap.aai.event.api.EventPublisher;
-
+import java.text.SimpleDateFormat;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.PreDestroy;
+import javax.ws.rs.core.Response.Status;
import org.onap.aai.cl.api.LogFields;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.aai.cl.mdc.MdcContext;
import org.onap.aai.cl.mdc.MdcOverride;
+import org.onap.aai.event.api.EventConsumer;
+import org.onap.aai.event.api.EventPublisher;
import org.onap.crud.dao.GraphDao;
import org.onap.crud.entity.Edge;
import org.onap.crud.entity.Vertex;
import org.onap.crud.event.GraphEvent;
import org.onap.crud.event.GraphEvent.GraphEventOperation;
-import org.onap.crud.event.GraphEvent.GraphEventResult;
import org.onap.crud.event.GraphEventEdge;
import org.onap.crud.event.GraphEventVertex;
+import org.onap.crud.event.envelope.GraphEventEnvelope;
+import org.onap.crud.event.response.GraphEventResponseHandler;
import org.onap.crud.exception.CrudException;
import org.onap.crud.logging.CrudServiceMsgs;
-import org.onap.crud.parser.CrudResponseBuilder;
import org.onap.crud.util.CrudProperties;
import org.onap.crud.util.CrudServiceConstants;
import org.onap.schema.OxmModelValidator;
import org.onap.schema.RelationshipSchemaValidator;
-import java.text.SimpleDateFormat;
-import java.util.HashMap;
-import java.util.Timer;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import javax.annotation.PreDestroy;
-import javax.ws.rs.core.Response.Status;
-
public class CrudAsyncGraphDataService extends AbstractGraphDataService {
- private static Integer requestTimeOut;
-
- private EventPublisher asyncRequestPublisher;
-
- private Timer timer;
-
- public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
- private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
-
- private static Logger logger = LoggerFactory.getInstance()
- .getLogger(CrudAsyncGraphDataService.class.getName());
- private static Logger metricsLogger = LoggerFactory.getInstance()
- .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
- private static LogFields OK_FIELDS = new LogFields();
-
- static {
- OK_FIELDS.setField(Status.OK, Status.OK.toString());
- }
-
- public static Integer getRequestTimeOut() {
- return requestTimeOut;
- }
-
- public CrudAsyncGraphDataService(GraphDao dao,
- EventPublisher asyncRequestPublisher,
- EventConsumer asyncResponseConsumer) throws CrudException {
- this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
- }
-
- public CrudAsyncGraphDataService(GraphDao dao,
- GraphDao daoForGet,
- EventPublisher asyncRequestPublisher,
- EventConsumer asyncResponseConsumer) throws CrudException {
-
- super();
- this.dao = dao;
- this.daoForGet = daoForGet;
-
- requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
- try {
- requestTimeOut
- = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
- } catch (NumberFormatException ex) {
- // Leave it as the default
+ private static Integer requestTimeOut;
+
+ private EventPublisher asyncRequestPublisher;
+
+ private Timer timer;
+
+ public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
+ private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
+
+ private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
+ private static Logger metricsLogger =
+ LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
+ private static LogFields okFields = new LogFields();
+
+ static {
+ okFields.setField(Status.OK, Status.OK.toString());
}
- Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
- try {
- responsePollInterval = Integer
- .parseInt(CrudProperties
- .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
- } catch (Exception ex) {
- logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
- + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
- + " error: " + ex.getMessage());
+ private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
+
+ public static Integer getRequestTimeOut() {
+ return requestTimeOut;
}
- // Start the Response Consumer timer
- CrudAsyncResponseConsumer crudAsyncResponseConsumer
- = new CrudAsyncResponseConsumer(asyncResponseConsumer);
- timer = new Timer("crudAsyncResponseConsumer-1");
- timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
+ public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException {
+ this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
+ }
- this.asyncRequestPublisher = asyncRequestPublisher;
-
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
- }
+ public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException {
- public class CollectGraphResponse implements Callable<GraphEvent> {
- private volatile GraphEvent graphEvent;
- private volatile CountDownLatch latch = new CountDownLatch(1);
+ super();
+ this.dao = dao;
+ this.daoForGet = daoForGet;
- @Override
- public GraphEvent call() throws TimeoutException {
- try {
- // Wait until graphEvent is available
- latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- latch.countDown();
- if (this.graphEvent != null) {
- return this.graphEvent;
- } else {
- throw new TimeoutException();
+ requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
+ try {
+ requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
+ } catch (NumberFormatException ex) {
+ // Leave it as the default
+ }
+
+ Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
+ try {
+ responsePollInterval =
+ Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
+ } catch (Exception ex) {
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
+ + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
}
- }
- return this.graphEvent;
+
+ // Start the Response Consumer timer
+ CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer);
+ timer = new Timer("crudAsyncResponseConsumer-1");
+ timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
+
+ this.asyncRequestPublisher = asyncRequestPublisher;
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
}
- public void populateGraphEvent(GraphEvent event) {
- this.graphEvent = event;
- latch.countDown();
+ public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
+ private volatile GraphEventEnvelope graphEventEnvelope;
+ private volatile CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public GraphEventEnvelope call() throws TimeoutException {
+ try {
+ // Wait until graphEvent is available
+ latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ latch.countDown();
+ if (this.graphEventEnvelope != null) {
+ return this.graphEventEnvelope;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ return this.graphEventEnvelope;
+ }
+
+ public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
+ this.graphEventEnvelope = eventEnvelope;
+ latch.countDown();
+ }
}
- }
- private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
+ private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
- long startTimeInMs = System.currentTimeMillis();
- SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
- MdcOverride override = new MdcOverride();
- override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
+ long startTimeInMs = System.currentTimeMillis();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ MdcOverride override = new MdcOverride();
+ override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
- // publish to request queue
- try {
- asyncRequestPublisher.sendSync(event.toJson());
- } catch (Exception e) {
- throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
- }
-
- logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
+ String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ // publish to request queue
+ try {
+ asyncRequestPublisher.sendSync(eventEnvelopeJson);
+ } catch (Exception e) {
+ throw new CrudException(
+ "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
+ Status.INTERNAL_SERVER_ERROR);
+ }
+
+ logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
"Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
- + " , transaction-id: " + event.getTransactionId() + " , operation: "
- + event.getOperation().toString());
-
- ExecutorService executor = Executors
- .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
- CollectGraphResponse collector = new CollectGraphResponse();
- CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
- GraphEvent response;
- Future<GraphEvent> future = executor.submit(collector);
- try {
- response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
-
- } catch (InterruptedException | ExecutionException | TimeoutException e) {
- CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
- logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
- "Request timed out for transactionId: " + event.getTransactionId());
- future.cancel(true);
- throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
- + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
- } finally {
- //Kill the thread as the work is completed
- executor.shutdownNow();
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString());
+
+ ExecutorService executor =
+ Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
+ CollectGraphResponse collector = new CollectGraphResponse();
+ CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
+ GraphEventEnvelope response;
+ Future<GraphEventEnvelope> future = executor.submit(collector);
+ try {
+ response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
+ "Request timed out for transactionId: " + event.getTransactionId());
+ future.cancel(true);
+ throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ // Kill the thread as the work is completed
+ executor.shutdownNow();
+ }
+ metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
+ "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
+ + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
+ + " ms");
+ return response;
}
- metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
- "Total elapsed time for operation: " + event.getOperation().toString()
- + " , transactionId: " + event.getTransactionId() + " is "
- + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
- return response;
- }
-
- public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
- // Validate the incoming payload
- Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
- type, payload.getProperties());
- // Create graph request event
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version,
- response.getVertex().toVertex()), version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+
+ @Override
+ public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
+ // Validate the incoming payload
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
+ // Create graph request event
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleVertexResponse(version, event, response);
}
- }
-
- public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
- Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
- // Create graph request event
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
- .edge(GraphEventEdge.fromEdge(edge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
- version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
+ Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
+ // Create graph request event
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleEdgeResponse(version, event, response);
}
- }
-
- public String updateVertex(String version, String id, String type, VertexPayload payload)
- throws CrudException {
- Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
- type, payload.getProperties());
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
- version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+
+ @Override
+ public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleVertexResponse(version, event, response);
}
- }
-
- public String patchVertex(String version, String id, String type, VertexPayload payload)
- throws CrudException {
- Vertex existingVertex
- = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
- Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
- type, payload.getProperties(),
- existingVertex);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertVertexResponse(
- OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
- version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
+ Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
+ new HashMap<String, String>());
+ Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
+ payload.getProperties(), existingVertex);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleVertexResponse(version, event, response);
}
- }
-
- public String deleteVertex(String version, String id, String type) throws CrudException {
- type = OxmModelValidator.resolveCollectionType(version, type);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
- .vertex(new GraphEventVertex(id, version, type, null)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return "";
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String deleteVertex(String version, String id, String type) throws CrudException {
+ type = OxmModelValidator.resolveCollectionType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .vertex(new GraphEventVertex(id, version, type, null)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleDeletionResponse(event, response);
}
- }
-
- public String deleteEdge(String version, String id, String type) throws CrudException {
- RelationshipSchemaValidator.validateType(version, type);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
- .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return "";
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String deleteEdge(String version, String id, String type) throws CrudException {
+ RelationshipSchemaValidator.validateType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleDeletionResponse(event, response);
}
- }
-
- public String updateEdge(String version, String id, String type, EdgePayload payload)
- throws CrudException {
- Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
- Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
- payload);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version,
- response.getEdge().toEdge()), version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
+ Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
+ Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleEdgeResponse(version, event, response);
}
- }
-
- public String patchEdge(String version, String id, String type, EdgePayload payload)
- throws CrudException {
- Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
- Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
- payload);
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
-
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- return CrudResponseBuilder.buildUpsertEdgeResponse(
- RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
- version);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ @Override
+ public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
+ Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
+ Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
+
+ GraphEventEnvelope response = sendAndWait(event);
+ return responseHandler.handleEdgeResponse(version, event, response);
}
- }
-
- @PreDestroy
- protected void preShutdown() {
- timer.cancel();
-
- }
-
- @Override
- protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
- event.setDbTransactionId(dbTransId);
- GraphEvent response = publishEvent(event);
- return response.getVertex().toVertex();
- }
-
- @Override
- protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
- .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
- event.setDbTransactionId(dbTransId);
- GraphEvent response = publishEvent(event);
- return response.getVertex().toVertex();
- }
-
- @Override
- protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
- event.setDbTransactionId(dbTransId);
- publishEvent(event);
- }
-
- @Override
- protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
- event.setDbTransactionId(dbTransId);
- GraphEvent response = publishEvent(event);
- return response.getEdge().toEdge();
- }
-
- @Override
- protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
- event.setDbTransactionId(dbTransId);
- GraphEvent response = publishEvent(event);
- return response.getEdge().toEdge();
- }
-
- @Override
- protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
- GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
- .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
- event.setDbTransactionId(dbTransId);
- publishEvent(event);
- }
-
- private GraphEvent publishEvent(GraphEvent event) throws CrudException {
- GraphEvent response = sendAndWait(event);
- if (response.getResult().equals(GraphEventResult.SUCCESS)) {
- logSuccessResponse(event, response);
- } else {
- logErrorResponse(event, response);
- throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
- + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
- }
-
- return response;
- }
-
- private void logSuccessResponse(GraphEvent event, GraphEvent response) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult());
- }
-
- private void logErrorResponse(GraphEvent event, GraphEvent response) {
- logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
- "Event response received: " + response.getObjectType() + " with key: "
- + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
- + " , operation: " + event.getOperation().toString() + " , result: "
- + response.getResult() + " , error: " + response.getErrorMessage());
- }
-} \ No newline at end of file
+ @PreDestroy
+ protected void preShutdown() {
+ timer.cancel();
+ }
+
+ @Override
+ protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
+ }
+
+ @Override
+ protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getVertex().toVertex();
+ }
+
+ @Override
+ protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .vertex(new GraphEventVertex(id, version, type, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+
+ @Override
+ protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
+ GraphEvent event =
+ GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
+ event.setDbTransactionId(dbTransId);
+ GraphEvent response = publishEvent(event);
+ return response.getEdge().toEdge();
+ }
+
+ @Override
+ protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+ event.setDbTransactionId(dbTransId);
+ publishEvent(event);
+ }
+
+ private GraphEvent publishEvent(GraphEvent event) throws CrudException {
+ GraphEventEnvelope response = sendAndWait(event);
+ responseHandler.handleBulkEventResponse(event, response);
+ return response.getBody();
+ }
+}
diff --git a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java
index 01b4c2d..94c1e1b 100644
--- a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java
+++ b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java
@@ -27,6 +27,7 @@ import javax.naming.OperationNotSupportedException;
import org.onap.aai.cl.api.Logger;
import org.onap.aai.cl.eelf.LoggerFactory;
import org.onap.crud.event.GraphEvent;
+import org.onap.crud.event.envelope.GraphEventEnvelope;
import org.onap.crud.logging.CrudServiceMsgs;
import org.onap.aai.event.api.EventConsumer;
@@ -76,7 +77,8 @@ public class CrudAsyncResponseConsumer extends TimerTask {
for (String event : events) {
try {
- GraphEvent graphEvent = GraphEvent.fromJson(event);
+ GraphEventEnvelope graphEventEnvelope = GraphEventEnvelope.fromJson(event);
+ GraphEvent graphEvent = graphEventEnvelope.getBody();
auditLogger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO,
"Event received of type: " + graphEvent.getObjectType() + " with key: "
+ graphEvent.getObjectKey() + " , transaction-id: "
@@ -92,7 +94,7 @@ public class CrudAsyncResponseConsumer extends TimerTask {
if (CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) != null) {
CrudAsyncGraphEventCache.get(graphEvent.getTransactionId())
- .populateGraphEvent(graphEvent);
+ .populateGraphEventEnvelope(graphEventEnvelope);
} else {
logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
"Request timed out. Not sending response for transaction-id: "