diff options
Diffstat (limited to 'src/main/java/org/onap/crud/service')
-rw-r--r-- | src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java | 654 | ||||
-rw-r--r-- | src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java | 6 |
2 files changed, 280 insertions, 380 deletions
diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index 592d4b3..dc30a4e 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -20,434 +20,332 @@ */ package org.onap.crud.service; -import org.onap.aai.event.api.EventConsumer; -import org.onap.aai.event.api.EventPublisher; - +import java.text.SimpleDateFormat; +import java.util.HashMap; +import java.util.Timer; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import javax.annotation.PreDestroy; +import javax.ws.rs.core.Response.Status; import org.onap.aai.cl.api.LogFields; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.cl.mdc.MdcContext; import org.onap.aai.cl.mdc.MdcOverride; +import org.onap.aai.event.api.EventConsumer; +import org.onap.aai.event.api.EventPublisher; import org.onap.crud.dao.GraphDao; import org.onap.crud.entity.Edge; import org.onap.crud.entity.Vertex; import org.onap.crud.event.GraphEvent; import org.onap.crud.event.GraphEvent.GraphEventOperation; -import org.onap.crud.event.GraphEvent.GraphEventResult; import org.onap.crud.event.GraphEventEdge; import org.onap.crud.event.GraphEventVertex; +import org.onap.crud.event.envelope.GraphEventEnvelope; +import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; -import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; import org.onap.schema.OxmModelValidator; import org.onap.schema.RelationshipSchemaValidator; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Timer; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import javax.annotation.PreDestroy; -import javax.ws.rs.core.Response.Status; - public class CrudAsyncGraphDataService extends AbstractGraphDataService { - private static Integer requestTimeOut; - - private EventPublisher asyncRequestPublisher; - - private Timer timer; - - public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; - private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; - - private static Logger logger = LoggerFactory.getInstance() - .getLogger(CrudAsyncGraphDataService.class.getName()); - private static Logger metricsLogger = LoggerFactory.getInstance() - .getMetricsLogger(CrudAsyncGraphDataService.class.getName()); - private static LogFields OK_FIELDS = new LogFields(); - - static { - OK_FIELDS.setField(Status.OK, Status.OK.toString()); - } - - public static Integer getRequestTimeOut() { - return requestTimeOut; - } - - public CrudAsyncGraphDataService(GraphDao dao, - EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { - this(dao,dao,asyncRequestPublisher,asyncResponseConsumer); - } - - public CrudAsyncGraphDataService(GraphDao dao, - GraphDao daoForGet, - EventPublisher asyncRequestPublisher, - EventConsumer asyncResponseConsumer) throws CrudException { - - super(); - this.dao = dao; - this.daoForGet = daoForGet; - - requestTimeOut = DEFAULT_REQUEST_TIMEOUT; - try { - requestTimeOut - = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); - } catch (NumberFormatException ex) { - // Leave it as the default + private static Integer requestTimeOut; + + private EventPublisher asyncRequestPublisher; + + private Timer timer; + + public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000; + private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000; + + private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName()); + private static Logger metricsLogger = + LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName()); + private static LogFields okFields = new LogFields(); + + static { + okFields.setField(Status.OK, Status.OK.toString()); } - Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; - try { - responsePollInterval = Integer - .parseInt(CrudProperties - .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); - } catch (Exception ex) { - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " - + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL - + " error: " + ex.getMessage()); + private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler(); + + public static Integer getRequestTimeOut() { + return requestTimeOut; } - // Start the Response Consumer timer - CrudAsyncResponseConsumer crudAsyncResponseConsumer - = new CrudAsyncResponseConsumer(asyncResponseConsumer); - timer = new Timer("crudAsyncResponseConsumer-1"); - timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException { + this(dao, dao, asyncRequestPublisher, asyncResponseConsumer); + } - this.asyncRequestPublisher = asyncRequestPublisher; - - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); - } + public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher, + EventConsumer asyncResponseConsumer) throws CrudException { - public class CollectGraphResponse implements Callable<GraphEvent> { - private volatile GraphEvent graphEvent; - private volatile CountDownLatch latch = new CountDownLatch(1); + super(); + this.dao = dao; + this.daoForGet = daoForGet; - @Override - public GraphEvent call() throws TimeoutException { - try { - // Wait until graphEvent is available - latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - latch.countDown(); - if (this.graphEvent != null) { - return this.graphEvent; - } else { - throw new TimeoutException(); + requestTimeOut = DEFAULT_REQUEST_TIMEOUT; + try { + requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT)); + } catch (NumberFormatException ex) { + // Leave it as the default + } + + Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL; + try { + responsePollInterval = + Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL)); + } catch (Exception ex) { + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse " + + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage()); } - } - return this.graphEvent; + + // Start the Response Consumer timer + CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer); + timer = new Timer("crudAsyncResponseConsumer-1"); + timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); + + this.asyncRequestPublisher = asyncRequestPublisher; + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!"); } - public void populateGraphEvent(GraphEvent event) { - this.graphEvent = event; - latch.countDown(); + public class CollectGraphResponse implements Callable<GraphEventEnvelope> { + private volatile GraphEventEnvelope graphEventEnvelope; + private volatile CountDownLatch latch = new CountDownLatch(1); + + @Override + public GraphEventEnvelope call() throws TimeoutException { + try { + // Wait until graphEvent is available + latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + latch.countDown(); + if (this.graphEventEnvelope != null) { + return this.graphEventEnvelope; + } else { + throw new TimeoutException(); + } + } + return this.graphEventEnvelope; + } + + public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) { + this.graphEventEnvelope = eventEnvelope; + latch.countDown(); + } } - } - private GraphEvent sendAndWait(GraphEvent event) throws CrudException { + private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException { - long startTimeInMs = System.currentTimeMillis(); - SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); - MdcOverride override = new MdcOverride(); - override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); + long startTimeInMs = System.currentTimeMillis(); + SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX"); + MdcOverride override = new MdcOverride(); + override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs)); - // publish to request queue - try { - asyncRequestPublisher.sendSync(event.toJson()); - } catch (Exception e) { - throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR); - } - - logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson()); + String eventEnvelopeJson = new GraphEventEnvelope(event).toJson(); - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, + // publish to request queue + try { + asyncRequestPublisher.sendSync(eventEnvelopeJson); + } catch (Exception e) { + throw new CrudException( + "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), + Status.INTERNAL_SERVER_ERROR); + } + + logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson); + + logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); - - ExecutorService executor = Executors - .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); - CollectGraphResponse collector = new CollectGraphResponse(); - CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); - GraphEvent response; - Future<GraphEvent> future = executor.submit(collector); - try { - response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); - - } catch (InterruptedException | ExecutionException | TimeoutException e) { - CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); - logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, - "Request timed out for transactionId: " + event.getTransactionId()); - future.cancel(true); - throw new CrudException("Timed out , transactionId: " + event.getTransactionId() - + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); - } finally { - //Kill the thread as the work is completed - executor.shutdownNow(); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); + + ExecutorService executor = + Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); + CollectGraphResponse collector = new CollectGraphResponse(); + CrudAsyncGraphEventCache.put(event.getTransactionId(), collector); + GraphEventEnvelope response; + Future<GraphEventEnvelope> future = executor.submit(collector); + try { + response = future.get(requestTimeOut, TimeUnit.MILLISECONDS); + + } catch (InterruptedException | ExecutionException | TimeoutException e) { + CrudAsyncGraphEventCache.invalidate(event.getTransactionId()); + logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, + "Request timed out for transactionId: " + event.getTransactionId()); + future.cancel(true); + throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR); + } finally { + // Kill the thread as the work is completed + executor.shutdownNow(); + } + metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override, + "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: " + + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs) + + " ms"); + return response; } - metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override, - "Total elapsed time for operation: " + event.getOperation().toString() - + " , transactionId: " + event.getTransactionId() + " is " - + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms"); - return response; - } - - public String addVertex(String version, String type, VertexPayload payload) throws CrudException { - // Validate the incoming payload - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, - type, payload.getProperties()); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, - response.getVertex().toVertex()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public String addVertex(String version, String type, VertexPayload payload) throws CrudException { + // Validate the incoming payload + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties()); + // Create graph request event + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String addEdge(String version, String type, EdgePayload payload) throws CrudException { - Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); - // Create graph request event - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .edge(GraphEventEdge.fromEdge(edge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String addEdge(String version, String type, EdgePayload payload) throws CrudException { + Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload); + // Create graph request event + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - public String updateVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, - type, payload.getProperties()); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + + @Override + public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException { + Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties()); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String patchVertex(String version, String id, String type, VertexPayload payload) - throws CrudException { - Vertex existingVertex - = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>()); - Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, - type, payload.getProperties(), - existingVertex); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertVertexResponse( - OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException { + Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, + new HashMap<String, String>()); + Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type, + payload.getProperties(), existingVertex); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleVertexResponse(version, event, response); } - } - - public String deleteVertex(String version, String id, String type) throws CrudException { - type = OxmModelValidator.resolveCollectionType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .vertex(new GraphEventVertex(id, version, type, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String deleteVertex(String version, String id, String type) throws CrudException { + type = OxmModelValidator.resolveCollectionType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); } - } - - public String deleteEdge(String version, String id, String type) throws CrudException { - RelationshipSchemaValidator.validateType(version, type); - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return ""; - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String deleteEdge(String version, String id, String type) throws CrudException { + RelationshipSchemaValidator.validateType(version, type); + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleDeletionResponse(event, response); } - } - - public String updateEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); - Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, - response.getEdge().toEdge()), version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException { + Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); + Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - public String patchEdge(String version, String id, String type, EdgePayload payload) - throws CrudException { - Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); - Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, - payload); - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); - - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - return CrudResponseBuilder.buildUpsertEdgeResponse( - RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()), - version); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); + @Override + public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException { + Edge edge = dao.getEdge(id, type, new HashMap<String, String>()); + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload); + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build(); + + GraphEventEnvelope response = sendAndWait(event); + return responseHandler.handleEdgeResponse(version, event, response); } - } - - @PreDestroy - protected void preShutdown() { - timer.cancel(); - - } - - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getVertex().toVertex(); - } - - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) - .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getVertex().toVertex(); - } - - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build(); - event.setDbTransactionId(dbTransId); - publishEvent(event); - } - - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getEdge().toEdge(); - } - - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); - event.setDbTransactionId(dbTransId); - GraphEvent response = publishEvent(event); - return response.getEdge().toEdge(); - } - - @Override - protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { - GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) - .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); - event.setDbTransactionId(dbTransId); - publishEvent(event); - } - - private GraphEvent publishEvent(GraphEvent event) throws CrudException { - GraphEvent response = sendAndWait(event); - if (response.getResult().equals(GraphEventResult.SUCCESS)) { - logSuccessResponse(event, response); - } else { - logErrorResponse(event, response); - throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId() - + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus()); - } - - return response; - } - - private void logSuccessResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult()); - } - - private void logErrorResponse(GraphEvent event, GraphEvent response) { - logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, - "Event response received: " + response.getObjectType() + " with key: " - + response.getObjectKey() + " , transaction-id: " + response.getTransactionId() - + " , operation: " + event.getOperation().toString() + " , result: " - + response.getResult() + " , error: " + response.getErrorMessage()); - } -}
\ No newline at end of file + @PreDestroy + protected void preShutdown() { + timer.cancel(); + } + + @Override + protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); + } + + @Override + protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) + .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getVertex().toVertex(); + } + + @Override + protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .vertex(new GraphEventVertex(id, version, type, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } + + @Override + protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } + + @Override + protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + GraphEvent event = + GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); + event.setDbTransactionId(dbTransId); + GraphEvent response = publishEvent(event); + return response.getEdge().toEdge(); + } + + @Override + protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException { + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) + .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); + event.setDbTransactionId(dbTransId); + publishEvent(event); + } + + private GraphEvent publishEvent(GraphEvent event) throws CrudException { + GraphEventEnvelope response = sendAndWait(event); + responseHandler.handleBulkEventResponse(event, response); + return response.getBody(); + } +} diff --git a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java index 01b4c2d..94c1e1b 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncResponseConsumer.java @@ -27,6 +27,7 @@ import javax.naming.OperationNotSupportedException; import org.onap.aai.cl.api.Logger; import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.crud.event.GraphEvent; +import org.onap.crud.event.envelope.GraphEventEnvelope; import org.onap.crud.logging.CrudServiceMsgs; import org.onap.aai.event.api.EventConsumer; @@ -76,7 +77,8 @@ public class CrudAsyncResponseConsumer extends TimerTask { for (String event : events) { try { - GraphEvent graphEvent = GraphEvent.fromJson(event); + GraphEventEnvelope graphEventEnvelope = GraphEventEnvelope.fromJson(event); + GraphEvent graphEvent = graphEventEnvelope.getBody(); auditLogger.info(CrudServiceMsgs.ASYNC_RESPONSE_CONSUMER_INFO, "Event received of type: " + graphEvent.getObjectType() + " with key: " + graphEvent.getObjectKey() + " , transaction-id: " @@ -92,7 +94,7 @@ public class CrudAsyncResponseConsumer extends TimerTask { if (CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) != null) { CrudAsyncGraphEventCache.get(graphEvent.getTransactionId()) - .populateGraphEvent(graphEvent); + .populateGraphEventEnvelope(graphEventEnvelope); } else { logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Request timed out. Not sending response for transaction-id: " |