diff options
author | Michael Arrastia <MArrasti@amdocs.com> | 2018-03-28 17:22:56 +0100 |
---|---|---|
committer | Michael Arrastia <MArrasti@amdocs.com> | 2018-03-29 18:38:17 +0100 |
commit | 837cbcdc2562c0cd041ed558d05bb7dbba4be603 (patch) | |
tree | 370e72f52e0420e0fa633de73a6fd47d03d17422 /src/main/java | |
parent | 0c19b1386259b7144bc6c95954f965c469835522 (diff) |
Update published event to include header and body
Originally, the published event only contained the raw graph
request payload.
This has now been updated to include the following changes:
- encapsulate the graph request in a body property
- add new event header with details such as timestamp, request-id,
event-type
Issue-ID: AAI-954
Change-Id: I780b6f52a01aafdcd7d09156e9d3a99c25be90a3
Signed-off-by: Michael Arrastia <MArrasti@amdocs.com>
Diffstat (limited to 'src/main/java')
7 files changed, 827 insertions, 404 deletions
diff --git a/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java b/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java new file mode 100644 index 0000000..13370a2 --- /dev/null +++ b/src/main/java/org/onap/crud/event/envelope/GraphEventEnvelope.java @@ -0,0 +1,130 @@ +/** + * ============LICENSE_START======================================================= + * Gizmo + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.crud.event.envelope; + +import javax.ws.rs.core.Response; +import org.onap.crud.event.GraphEvent; +import org.onap.crud.event.GraphEventEdge; +import org.onap.crud.event.GraphEventVertex; +import org.onap.crud.exception.CrudException; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + +/** + * Defines the top-level structure of the Gizmo events. Primarily serves as a wrapper for the graph details and includes + * an event header to describe and identify the event. + */ +public class GraphEventEnvelope { + + private GraphEventHeader header; + private GraphEvent body; + private JsonElement policyViolations; + + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + /** + * Construct the event. + * + * @param header the event header to describe the event + * @param body the body of the event with graph details + */ + public GraphEventEnvelope(GraphEventHeader header, GraphEvent body) { + this.header = header; + this.body = body; + } + + /** + * Construct the envelope header from the provided GraphEvent. + * + * @param event the graph event for a vertex or edge operation + */ + public GraphEventEnvelope(GraphEvent event) { + this.header = new GraphEventHeader.Builder().requestId(event.getTransactionId()) + .validationEntityType(getType(event)).build(); + this.body = event; + } + + public GraphEvent getBody() { + return body; + } + + public GraphEventHeader getHeader() { + return header; + } + + public JsonElement getPolicyViolations() { + return policyViolations != null ? policyViolations : new JsonArray(); + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /** + * Deserializes a JSON string into a GraphEventEnvelope object. + * + * @param json the JSON string + * @return a GraphEventEnvelope object + * @throws CrudException + */ + public static GraphEventEnvelope fromJson(String json) throws CrudException { + try { + if (json == null || json.isEmpty()) { + throw new CrudException("Empty or null JSON string.", Response.Status.BAD_REQUEST); + } + return gson.fromJson(json, GraphEventEnvelope.class); + } catch (Exception ex) { + throw new CrudException("Unable to parse JSON string: " + json, Response.Status.BAD_REQUEST); + } + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } + + private String getType(GraphEvent event) { + GraphEventVertex vertex = event.getVertex(); + GraphEventEdge edge = event.getEdge(); + if (vertex != null) { + return vertex.getType(); + } else if (edge != null) { + return edge.getType(); + } + return null; + } + +} diff --git a/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java b/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java new file mode 100644 index 0000000..f70127d --- /dev/null +++ b/src/main/java/org/onap/crud/event/envelope/GraphEventHeader.java @@ -0,0 +1,197 @@ +/** + * ============LICENSE_START======================================================= + * Gizmo + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.crud.event.envelope; + +import java.time.Instant; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; +import java.util.Objects; +import java.util.UUID; +import org.apache.commons.lang3.builder.EqualsBuilder; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.annotations.SerializedName; + +public class GraphEventHeader { + + @SerializedName("request-id") + private String requestId; + + private String timestamp; + + @SerializedName("source-name") + private String sourceName; + + @SerializedName("event-type") + private String eventType; + + @SerializedName("validation-entity-type") + private String validationEntityType; + + @SerializedName("validation-top-entity-type") + private String validationTopEntityType; + + @SerializedName("entity-link") + private String entityLink; + + private static final String MICROSERVICE_NAME = "GIZMO"; + private static final String EVENT_TYPE_PENDING_UPDATE = "pending-update"; + private static final Gson gson = new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create(); + + public static class Builder { + + private String requestId; + private String validationEntityType; + private String validationTopEntityType; + private String entityLink; + + public Builder requestId(String val) { + requestId = val; + return this; + } + + public Builder validationEntityType(String val) { + validationEntityType = val; + return this; + } + + public Builder validationTopEntityType(String val) { + validationTopEntityType = val; + return this; + } + + public Builder entityLink(String val) { + entityLink = val; + return this; + } + + public GraphEventHeader build() { + return new GraphEventHeader(this); + } + } + + private GraphEventHeader(Builder builder) { + requestId = builder.requestId != null ? builder.requestId : UUID.randomUUID().toString(); + timestamp = DateTimeFormatter.ofPattern("yyyyMMdd'T'HHmmssX").withZone(ZoneOffset.UTC).format(Instant.now()); + sourceName = MICROSERVICE_NAME; + eventType = EVENT_TYPE_PENDING_UPDATE; + + validationEntityType = builder.validationEntityType; + validationTopEntityType = builder.validationTopEntityType; + entityLink = builder.entityLink; + } + + /** + * Serializes this object into a JSON string representation. + * + * @return a JSON format string representation of this object. + */ + public String toJson() { + return gson.toJson(this); + } + + /////////////////////////////////////////////////////////////////////////// + // GETTERS + /////////////////////////////////////////////////////////////////////////// + + public String getRequestId() { + return requestId; + } + + public String getTimestamp() { + return timestamp; + } + + public String getSourceName() { + return sourceName; + } + + public String getEventType() { + return eventType; + } + + public String getValidationEntityType() { + return validationEntityType; + } + + public String getValidationTopEntityType() { + return validationTopEntityType; + } + + public String getEntityLink() { + return entityLink; + } + + /////////////////////////////////////////////////////////////////////////// + // OVERRIDES + /////////////////////////////////////////////////////////////////////////// + + /* + * (non-Javadoc) + * + * @see java.lang.Object#hashCode() + */ + @Override + public int hashCode() { + return Objects.hash(this.requestId, this.timestamp, this.sourceName, this.eventType, this.validationEntityType, + this.validationTopEntityType, this.entityLink); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#equals(java.lang.Object) + */ + @Override + public boolean equals(Object obj) { + if (!(obj instanceof GraphEventHeader)) { + return false; + } else if (obj == this) { + return true; + } + GraphEventHeader rhs = (GraphEventHeader) obj; + // @formatter:off + return new EqualsBuilder() + .append(requestId, rhs.requestId) + .append(timestamp, rhs.timestamp) + .append(sourceName, rhs.sourceName) + .append(eventType, rhs.sourceName) + .append(validationEntityType, rhs.validationEntityType) + .append(validationTopEntityType, rhs.validationTopEntityType) + .append(entityLink, rhs.entityLink) + .isEquals(); + // @formatter:on + } + + /* + * (non-Javadoc) + * + * @see java.lang.Object#toString() + */ + @Override + public String toString() { + return this.toJson(); + } + +} diff --git a/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java b/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java new file mode 100644 index 0000000..1fa056e --- /dev/null +++ b/src/main/java/org/onap/crud/event/response/GraphEventResponseHandler.java @@ -0,0 +1,142 @@ +/** + * ============LICENSE_START======================================================= + * Gizmo + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.crud.event.response; + +import javax.ws.rs.core.Response.Status; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; +import org.onap.crud.event.GraphEvent; +import org.onap.crud.event.GraphEvent.GraphEventResult; +import org.onap.crud.event.envelope.GraphEventEnvelope; +import org.onap.crud.exception.CrudException; +import org.onap.crud.logging.CrudServiceMsgs; +import org.onap.crud.parser.CrudResponseBuilder; +import org.onap.schema.OxmModelValidator; +import org.onap.schema.RelationshipSchemaValidator; + +/** + * Reads event responses, logs and generates exceptions if errors are found. + * + */ +public class GraphEventResponseHandler { + + private static Logger logger = LoggerFactory.getInstance().getLogger(GraphEventResponseHandler.class.getName()); + + public String handleVertexResponse(String version, GraphEvent event, GraphEventEnvelope response) + throws CrudException { + handlePolicyViolations(event, response); + logResponse(event, response.getBody()); + + if (isErrorResponse(response.getBody())) { + throwOperationException(response); + } + + return CrudResponseBuilder.buildUpsertVertexResponse( + OxmModelValidator.validateOutgoingPayload(version, response.getBody().getVertex().toVertex()), version); + } + + public String handleEdgeResponse(String version, GraphEvent event, GraphEventEnvelope response) + throws CrudException { + handlePolicyViolations(event, response); + logResponse(event, response.getBody()); + + if (isErrorResponse(response.getBody())) { + throwOperationException(response); + } + + return CrudResponseBuilder.buildUpsertEdgeResponse( + RelationshipSchemaValidator.validateOutgoingPayload(version, response.getBody().getEdge().toEdge()), + version); + } + + public String handleDeletionResponse(GraphEvent event, GraphEventEnvelope response) throws CrudException { + handlePolicyViolations(event, response); + logResponse(event, response.getBody()); + + if (isErrorResponse(response.getBody())) { + throwOperationException(response); + } + + return ""; + } + + public void handleBulkEventResponse(GraphEvent event, GraphEventEnvelope response) throws CrudException { + handlePolicyViolations(event, response); + logResponse(event, response.getBody()); + + if (isErrorResponse(response.getBody())) { + throwOperationException(response); + } + } + + public boolean hasPolicyViolations(GraphEventEnvelope event) { + return event.getPolicyViolations() != null && event.getPolicyViolations().isJsonArray() + && event.getPolicyViolations().getAsJsonArray().size() != 0; + } + + private void handlePolicyViolations(GraphEvent event, GraphEventEnvelope response) throws CrudException { + if (hasPolicyViolations(response)) { + logPolicyViolation(event, response); + throw new CrudException(GraphEventResponseMessage.POLICY_VIOLATION_EXCEPTION_MESSAGE.getMessage( + response.getBody().getTransactionId(), response.getPolicyViolations()), Status.BAD_REQUEST); + } + } + + private void logResponse(GraphEvent event, GraphEvent response) { + String message = GraphEventResponseMessage.BASE_OPERATION_LOG_MESSAGE.getMessage(response.getObjectType(), + response.getObjectKey(), response.getTransactionId(), event.getOperation().toString(), + response.getResult()); + if (isErrorResponse(response)) { + message = GraphEventResponseMessage.OPERATION_ERROR_LOG_MESSAGE.getMessage(message, + response.getErrorMessage()); + } + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, message); + } + + private void logPolicyViolation(GraphEvent event, GraphEventEnvelope response) { + //@formatter:off + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, + GraphEventResponseMessage.POLICY_VIOLATION_LOG_MESSAGE.getMessage( + response.getBody().getTransactionId(), + response.getHeader().getSourceName(), + response.getHeader().getEventType(), + response.getBody().getObjectKey(), + response.getBody().getObjectType(), + event.getOperation().toString(), + response.getPolicyViolations().toString())); + //@formatter:on + } + + private void throwOperationException(GraphEventEnvelope response) throws CrudException { + throw new CrudException( + GraphEventResponseMessage.OPERATION_ERROR_EXCEPTION_MESSAGE + .getMessage(response.getBody().getTransactionId(), response.getBody().getErrorMessage()), + response.getBody().getHttpErrorStatus()); + } + + private boolean isErrorResponse(GraphEvent response) { + return GraphEventResult.FAILURE.equals(response.getResult()); + } +} diff --git a/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java b/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java new file mode 100644 index 0000000..5e4c6bd --- /dev/null +++ b/src/main/java/org/onap/crud/event/response/GraphEventResponseMessage.java @@ -0,0 +1,56 @@ +/** + * ============LICENSE_START======================================================= + * Gizmo + * ================================================================================ + * Copyright © 2017 AT&T Intellectual Property. + * Copyright © 2017 Amdocs + * All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + */ +package org.onap.crud.event.response; + +import java.text.MessageFormat; + +/** + * Graph event handling messages, including logging and exceptions. + */ +public enum GraphEventResponseMessage { + + //@formatter:off + BASE_OPERATION_LOG_MESSAGE("Event response received: {0} with key: {1}, transaction-id: {2}, operation: {3}, result: {4}"), + OPERATION_ERROR_LOG_MESSAGE("{0}, error: {1}"), + OPERATION_ERROR_EXCEPTION_MESSAGE("Operation Failed with transaction-id: {0}. Error: {1}"), + POLICY_VIOLATION_LOG_MESSAGE("Event response received: transaction-id: {0}, event source: {1}, event type: {2}, object key: {3}, object type: {4}, operation: {5}, result: policy violations detected in request, error: {6}"), + POLICY_VIOLATION_EXCEPTION_MESSAGE("Operation Failed with transaction-id {0}. Error: policy violations detected in request, {1}"); + //@formatter:on + + private String message; + + private GraphEventResponseMessage(String message) { + this.message = message; + } + + /** + * @param args to be formatted + * @return the formatted error message + */ + public String getMessage(Object... args) { + MessageFormat formatter = new MessageFormat(""); + formatter.applyPattern(this.message); + return formatter.format(args); + } +} diff --git a/src/main/java/org/onap/crud/exception/CrudException.java b/src/main/java/org/onap/crud/exception/CrudException.java index 919af96..a4fb3d7 100644 --- a/src/main/java/org/onap/crud/exception/CrudException.java +++ b/src/main/java/org/onap/crud/exception/CrudException.java @@ -24,36 +24,34 @@ import javax.ws.rs.core.Response.Status; public class CrudException extends Exception { - private static final long serialVersionUID = 8162385108397238865L; + private static final long serialVersionUID = 8162385108397238865L; - private Status httpStatus; + private Status httpStatus; - public CrudException() { - } + public CrudException() {} - public CrudException(String message, Status httpStatus) { - super(message); - this.setHttpStatus(httpStatus); - } + public CrudException(String message, Status httpStatus) { + super(message); + this.setHttpStatus(httpStatus); + } - public CrudException(Throwable cause) { - super(cause); - } + public CrudException(Throwable cause) { + super(cause); + } - public CrudException(String message, Throwable cause) { - super(message, cause); - } + public CrudException(String message, Throwable cause) { + super(message, cause); + } - public CrudException(String message, Throwable cause, boolean enableSuppression, - boolean writableStackTrace) { - super(message, cause, enableSuppression, writableStackTrace); - } + public CrudException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } - public Status getHttpStatus() { - return httpStatus; - } + public Status getHttpStatus() { + return httpStatus; + } - public void setHttpStatus(Status httpStatus) { - this.httpStatus = httpStatus; - } + public void setHttpStatus(Status httpStatus) { + this.httpStatus = httpStatus; + } } diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index 592d4b3..dc30a4e 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -20,434 +20,332 @@ */ package org.onap.crud.service; -import org.onap.aai.event.api.EventConsumer; -import org.onap.aai.event.api.EventPublisher; - +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.PreDestroy; +import javax.ws.rs.core.Response.Status; import org.onap.aai.cl.api.LogFields; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.cl.mdc.MdcOverride; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; import org.onap.crud.dao.GraphDao; import org.onap.crud.entity.Edge; import org.onap.crud.entity.Vertex; import org.onap.crud.event.GraphEvent; import org.onap.crud.event.GraphEvent.GraphEventOperation; -import org.onap.crud.event.GraphEvent.GraphEventResult; import org.onap.crud.event.GraphEventEdge; import org.onap.crud.event.GraphEventVertex; +import org.onap.crud.event.envelope.GraphEventEnvelope; +import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; -import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; import org.onap.schema.OxmModelValidator; import org.onap.schema.RelationshipSchemaValidator; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Timer; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.PreDestroy; -import javax.ws.rs.core.Response.Status; - public class CrudAsyncGraphDataService extends AbstractGraphDataService { - private static Integer requestTimeOut; - - private EventPublisher asyncRequestPublisher; - - private Timer timer; - - public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; - private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; - - private static Logger logger = LoggerFactory.getInstance() - .getLogger(CrudAsyncGraphDataService.class.getName()); - private static Logger metricsLogger = LoggerFactory.getInstance() - .getMetricsLogger(CrudAsyncGraphDataService.class.getName()); - private static LogFields OK_FIELDS = new LogFields(); - - static { - OK_FIELDS.setField(Status.OK, Status.OK.toString()); - } - - public static Integer getRequestTimeOut() { - return requestTimeOut; - } - - public CrudAsyncGraphDataService(GraphDao dao, - EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { - this(dao,dao,asyncRequestPublisher,asyncResponseConsumer); - } - - public CrudAsyncGraphDataService(GraphDao dao, - GraphDao daoForGet, - EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { - - super(); - this.dao = dao; - this.daoForGet = daoForGet; - - requestTimeOut = DEFAULT_REQUEST_TIMEOUT; - try { - requestTimeOut - = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); - } catch (NumberFormatException ex) { - // Leave it as the default + private static Integer requestTimeOut; + + private EventPublisher asyncRequestPublisher; + + private Timer timer; + + public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; + private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; + + private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName()); + private static Logger metricsLogger = + LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName()); + private static LogFields okFields = new LogFields(); + + static { + okFields.setField(Status.OK, Status.OK.toString()); } - Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; - try { - responsePollInterval = Integer - .parseInt(CrudProperties - .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); - } catch (Exception ex) { - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " - + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL - + " error: " + ex.getMessage()); + private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler(); + + public static Integer getRequestTimeOut() { + return requestTimeOut; } - // Start the Response Consumer timer - CrudAsyncResponseConsumer crudAsyncResponseConsumer - = new CrudAsyncResponseConsumer(asyncResponseConsumer); - timer = new Timer("crudAsyncResponseConsumer-1"); - timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException { + this(dao, dao, asyncRequestPublisher, asyncResponseConsumer); + } - this.asyncRequestPublisher = asyncRequestPublisher; - - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); - } + public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException { - public class CollectGraphResponse implements Callable<GraphEvent> { - private volatile GraphEvent graphEvent; - private volatile CountDownLatch latch = new CountDownLatch(1); + super(); + this.dao = dao; + this.daoForGet = daoForGet; - @Override - public GraphEvent call() throws TimeoutException { - try { - // Wait until graphEvent is available - latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - latch.countDown(); - if (this.graphEvent != null) { - return this.graphEvent; - } else { - throw new TimeoutException(); + requestTimeOut = DEFAULT_REQUEST_TIMEOUT; + try { + requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); + } catch (NumberFormatException ex) { + // Leave it as the default + } + + Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; + try { + responsePollInterval = + Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); + } catch (Exception ex) { + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " + + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage()); } - } - return this.graphEvent; + + // Start the Response Consumer timer + CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer); + timer = new Timer("crudAsyncResponseConsumer-1"); + timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + + this.asyncRequestPublisher = asyncRequestPublisher; + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); } - public void populateGraphEvent(GraphEvent event) { - this.graphEvent = event; - latch.countDown(); + public class CollectGraphResponse implements Callable<GraphEventEnvelope> { + private volatile GraphEventEnvelope graphEventEnvelope; + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Override + public GraphEventEnvelope call() throws TimeoutException { + try { + // Wait until graphEvent is available + latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + latch.countDown(); + if (this.graphEventEnvelope != null) { + return this.graphEventEnvelope; + } else { + throw new TimeoutException(); + } + } + return this.graphEventEnvelope; + } + + public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) { + this.graphEventEnvelope = eventEnvelope; + latch.countDown(); + } } - } - private GraphEvent sendAndWait(GraphEvent event) throws CrudException { + private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException { - long startTimeInMs = System.currentTimeMillis(); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - MdcOverride override = new MdcOverride(); - override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + long startTimeInMs = System.currentTimeMillis(); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + MdcOverride override = new MdcOverride(); + override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); - // publish to request queue - try { - asyncRequestPublisher.sendSync(event.toJson()); - } catch (Exception e) { - throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR); - } - - logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson()); + String eventEnvelopeJson = new GraphEventEnvelope(event).toJson(); - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, + // publish to request queue + try { + asyncRequestPublisher.sendSync(eventEnvelopeJson); + } catch (Exception e) { + throw new CrudException( + "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), + Status.INTERNAL_SERVER_ERROR); + } + + logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson); + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); - - ExecutorService executor = Executors - .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); - CollectGraphResponse collector = new CollectGraphResponse(); - CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); - GraphEvent response; - Future<GraphEvent> future = executor.submit(collector); - try { - response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); - - } catch (InterruptedException | ExecutionException | TimeoutException e) { - CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, - "Request timed out for transactionId: " + event.getTransactionId()); - future.cancel(true); - throw new CrudException("Timed out , transactionId: " + event.getTransactionId() - + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); - } finally { - //Kill the thread as the work is completed - executor.shutdownNow(); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); + + ExecutorService executor = + Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); + CollectGraphResponse collector = new CollectGraphResponse(); + CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); + GraphEventEnvelope response; + Future<GraphEventEnvelope> future = executor.submit(collector); + try { + response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, + "Request timed out for transactionId: " + event.getTransactionId()); + future.cancel(true); + throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); + } finally { + // Kill the thread as the work is completed + executor.shutdownNow(); + } + metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override, + "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: " + + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs) + + " ms"); + return response; } - metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override, - "Total elapsed time for operation: " + event.getOperation().toString() - + " , transactionId: " + event.getTransactionId() + " is " - + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms"); - return response; - } - - public String addVertex(String version, String type, VertexPayload payload) throws CrudException { - // Validate the incoming payload - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, - type, payload.getProperties()); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, - response.getVertex().toVertex()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public String addVertex(String version, String type, VertexPayload payload) throws CrudException { + // Validate the incoming payload + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties()); + // Create graph request event + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String addEdge(String version, String type, EdgePayload payload) throws CrudException { - Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .edge(GraphEventEdge.fromEdge(edge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String addEdge(String version, String type, EdgePayload payload) throws CrudException { + Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); + // Create graph request event + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - public String updateVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, - type, payload.getProperties()); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException { + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties()); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String patchVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex existingVertex - = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>()); - Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, - type, payload.getProperties(), - existingVertex); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException { + Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, + new HashMap<String, String>()); + Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type, + payload.getProperties(), existingVertex); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String deleteVertex(String version, String id, String type) throws CrudException { - type = OxmModelValidator.resolveCollectionType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .vertex(new GraphEventVertex(id, version, type, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String deleteVertex(String version, String id, String type) throws CrudException { + type = OxmModelValidator.resolveCollectionType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); } - } - - public String deleteEdge(String version, String id, String type) throws CrudException { - RelationshipSchemaValidator.validateType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String deleteEdge(String version, String id, String type) throws CrudException { + RelationshipSchemaValidator.validateType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); } - } - - public String updateEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); - Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, - response.getEdge().toEdge()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException { + Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); + Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - public String patchEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); - Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException { + Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - @PreDestroy - protected void preShutdown() { - timer.cancel(); - - } - - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getVertex().toVertex(); - } - - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getVertex().toVertex(); - } - - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build(); - event.setDbTransactionId(dbTransId); - publishEvent(event); - } - - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getEdge().toEdge(); - } - - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getEdge().toEdge(); - } - - @Override - protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); - event.setDbTransactionId(dbTransId); - publishEvent(event); - } - - private GraphEvent publishEvent(GraphEvent event) throws CrudException { - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); - } - - return response; - } - - private void logSuccessResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult()); - } - - private void logErrorResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult() + " , error: " + response.getErrorMessage()); - } -}
\ No newline at end of file + @PreDestroy + protected void preShutdown() { + timer.cancel(); + } + + @Override + protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); + } + + @Override + protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); + } + + @Override + protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } + + @Override + protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } + + @Override + protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } + + @Override + protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } + + private GraphEvent publishEvent(GraphEvent event) throws CrudException { + GraphEventEnvelope response = sendAndWait(event); + responseHandler.handleBulkEventResponse(event, response); + return response.getBody(); + } +} diff --git a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java index 01b4c2d..94c1e1b 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java @@ -27,6 +27,7 @@ import javax.naming.OperationNotSupportedException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.crud.event.GraphEvent; +import org.onap.crud.event.envelope.GraphEventEnvelope; import org.onap.crud.logging.CrudServiceMsgs; import org.onap.aai.event.api.EventConsumer; @@ -76,7 +77,8 @@ public class CrudAsyncResponseConsumer extends TimerTask { for (String event : events) { try { - GraphEvent graphEvent = GraphEvent.fromJson(event); + GraphEventEnvelope graphEventEnvelope = GraphEventEnvelope.fromJson(event); + GraphEvent graphEvent = graphEventEnvelope.getBody(); auditLogger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "Event received of type: " + graphEvent.getObjectType() + " with key: " + graphEvent.getObjectKey() + " , transaction-id: " @@ -92,7 +94,7 @@ public class CrudAsyncResponseConsumer extends TimerTask { if (CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) != null) { CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) - .populateGraphEvent(graphEvent); + .populateGraphEventEnvelope(graphEventEnvelope); } else { logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Request timed out. Not sending response for transaction-id: " |