From f60a17c6abb6deef1c24f917488745cbc6e6a566 Mon Sep 17 00:00:00 2001 From: "Popescu, Serban" Date: Wed, 13 Feb 2019 10:29:59 -0500 Subject: Performance Improvements for Gizmo bulk API Use bulk operations with Gizmo/Champ to improve performance. Also allows for HA by allowing Champ to operate in stateless mode Change-Id: I63bbbf8d6071cecb4b22110c477d7dc592026200 Issue-ID: AAI-2147 Signed-off-by: Serban Popescu --- .../crud/service/AbstractGraphDataService.java | 207 +------------------ .../crud/service/CrudAsyncGraphDataService.java | 227 +++++++++++++++++++-- .../onap/crud/service/CrudGraphDataService.java | 51 ++--- 3 files changed, 234 insertions(+), 251 deletions(-) (limited to 'src/main/java/org/onap/crud/service') diff --git a/src/main/java/org/onap/crud/service/AbstractGraphDataService.java b/src/main/java/org/onap/crud/service/AbstractGraphDataService.java index 8225adf..1fdd841 100644 --- a/src/main/java/org/onap/crud/service/AbstractGraphDataService.java +++ b/src/main/java/org/onap/crud/service/AbstractGraphDataService.java @@ -22,16 +22,12 @@ package org.onap.crud.service; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import com.google.gson.JsonElement; import com.google.gson.reflect.TypeToken; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; import javax.ws.rs.core.EntityTag; import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.Response.Status; import net.dongliu.gson.GsonJava8TypeAdapterFactory; import org.apache.commons.lang3.tuple.ImmutablePair; import org.onap.aai.restclient.client.OperationResult; @@ -92,199 +88,6 @@ public abstract class AbstractGraphDataService { return new ImmutablePair<>(entityTag, CrudResponseBuilder.buildGetVerticesResponse(vertices, version)); } - public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { - HashMap vertices = new HashMap<>(); - HashMap edges = new HashMap<>(); - - String txId = dao.openTransaction(); - - try { - // Step 1. Handle edge deletes (must happen before vertex deletes) - for (JsonElement v : payload.getRelationships()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); - - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); - } - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); - - if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { - deleteBulkEdge(edgePayload.getId(), version, txId); - } - } - - // Step 2: Handle vertex deletes - for (JsonElement v : payload.getObjects()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); - - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); - } - - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); - - if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { - String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()); - deleteBulkVertex(vertexPayload.getId(), version, type, txId); - } - } - - // Step 3: Handle vertex add/modify (must happen before edge adds) - for (JsonElement v : payload.getObjects()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); - - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); - } - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); - - // Add vertex - if (opr.getValue().getAsString().equalsIgnoreCase("add")) { - vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), - headers, true)); - Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(), - vertexPayload.getProperties()); - Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId); - Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); - vertices.put(item.getKey(), outgoingVertex); - } - - // Update vertex - else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { - vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), - headers, false)); - Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version, - vertexPayload.getType(), vertexPayload.getProperties()); - Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); - Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); - vertices.put(item.getKey(), outgoingVertex); - } - - // Patch vertex - else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) { - if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) { - throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST); - } - - vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), - headers, false)); - - OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap()); - Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); - Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(), - version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex); - Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); - Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); - vertices.put(item.getKey(), outgoingVertex); - } - } - - // Step 4: Handle edge add/modify - for (JsonElement v : payload.getRelationships()) { - List> entries = new ArrayList>( - v.getAsJsonObject().entrySet()); - - if (entries.size() != 2) { - throw new CrudException("", Status.BAD_REQUEST); - } - Map.Entry opr = entries.get(0); - Map.Entry item = entries.get(1); - EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); - - // Add/Update edge - if (opr.getValue().getAsString().equalsIgnoreCase("add") - || opr.getValue().getAsString().equalsIgnoreCase("modify") - || opr.getValue().getAsString().equalsIgnoreCase("patch")) { - Edge validatedEdge; - Edge persistedEdge; - if (opr.getValue().getAsString().equalsIgnoreCase("add")) { - // Fix the source/destination - if (edgePayload.getSource().startsWith("$")) { - Vertex source = vertices.get(edgePayload.getSource().substring(1)); - if (source == null) { - throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1), - Status.INTERNAL_SERVER_ERROR); - } - edgePayload - .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get()); - } - if (edgePayload.getTarget().startsWith("$")) { - Vertex target = vertices.get(edgePayload.getTarget().substring(1)); - if (target == null) { - throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1), - Status.INTERNAL_SERVER_ERROR); - } - edgePayload - .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get()); - } - - // If the type isn't set, resolve it based on on the sourece and target vertex types - if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { - edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version)); - } - - validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload); - - persistedEdge = addBulkEdge(validatedEdge, version, txId); - } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { - Edge edge = dao.getEdge(edgePayload.getId(), txId); - - // If the type isn't set, resolve it based on on the sourece and target vertex types - if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { - edgePayload.setType(edge.getType()); - } - - validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload); - - persistedEdge = updateBulkEdge(validatedEdge, version, txId); - } else { - if (edgePayload.getId() == null) { - throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST); - } - Edge existingEdge = dao.getEdge(edgePayload.getId(), txId); - - // If the type isn't set, resolve it based on on the sourece and target vertex types - if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { - edgePayload.setType(existingEdge.getType()); - } - - Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload); - persistedEdge = updateBulkEdge(patchedEdge, version, txId); - } - - - Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge); - edges.put(item.getKey(), outgoingEdge); - } - } - - // commit transaction - dao.commitTransaction(txId); - } catch (CrudException ex) { - dao.rollbackTransaction(txId); - throw ex; - } catch (Exception ex) { - dao.rollbackTransaction(txId); - throw ex; - } finally { - if (dao.transactionExists(txId)) { - dao.rollbackTransaction(txId); - } - } - - return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload); - } - - public abstract ImmutablePair addVertex(String version, String type, VertexPayload payload) throws CrudException; public abstract ImmutablePair updateVertex(String version, String id, String type, @@ -299,13 +102,7 @@ public abstract class AbstractGraphDataService { EdgePayload payload) throws CrudException; public abstract ImmutablePair patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException; - - protected abstract Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException; - protected abstract Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException; - protected abstract void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException; - - protected abstract Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException; - protected abstract Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException; - protected abstract void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException; + + public abstract String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException; } diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java index 14ba7f2..7906de0 100644 --- a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java @@ -23,7 +23,10 @@ package org.onap.crud.service; import java.io.IOException; import java.security.NoSuchAlgorithmException; import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Timer; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -35,6 +38,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.annotation.PreDestroy; import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.tuple.ImmutablePair; import org.onap.aai.cl.api.LogFields; @@ -56,12 +60,16 @@ import org.onap.crud.event.envelope.GraphEventEnvelope; import org.onap.crud.event.response.GraphEventResponseHandler; import org.onap.crud.exception.CrudException; import org.onap.crud.logging.CrudServiceMsgs; +import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudProperties; import org.onap.crud.util.CrudServiceConstants; +import org.onap.crud.util.CrudServiceUtil; import org.onap.crud.util.etag.EtagGenerator; import org.onap.schema.OxmModelValidator; import org.onap.schema.RelationshipSchemaValidator; +import com.google.gson.JsonElement; + public class CrudAsyncGraphDataService extends AbstractGraphDataService { private static Integer requestTimeOut; @@ -119,8 +127,8 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { // Start the Response Consumer timer CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer( - asyncResponseConsumer, new GraphEventUpdater() - ); + asyncResponseConsumer, new GraphEventUpdater() + ); timer = new Timer("crudAsyncResponseConsumer-1"); timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval); @@ -178,8 +186,8 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey() - + " , transaction-id: " + event.getTransactionId() + " , operation: " - + event.getOperation().toString()); + + " , transaction-id: " + event.getTransactionId() + " , operation: " + + event.getOperation().toString()); ExecutorService executor = Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId())); @@ -367,8 +375,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { timer.cancel(); } - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { + private Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -376,8 +383,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { + private Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE) .vertex(GraphEventVertex.fromVertex(vertex, version)).build(); event.setDbTransactionId(dbTransId); @@ -385,16 +391,14 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getVertex().toVertex(); } - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { + private void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .vertex(new GraphEventVertex(id, version, type, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -402,8 +406,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { + private Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build(); event.setDbTransactionId(dbTransId); @@ -411,8 +414,7 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { return response.getEdge().toEdge(); } - @Override - protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { + private void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { // Get the edge type String type = null; try { @@ -423,13 +425,204 @@ public class CrudAsyncGraphDataService extends AbstractGraphDataService { // Likely the client is trying to delete an edge which isn't present. Just swallow the exception // and let the bulk request fail via the normal path. } - + GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE) .edge(new GraphEventEdge(id, version, type, null, null, null)).build(); event.setDbTransactionId(dbTransId); publishEvent(event); } + @Override + public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { + HashMap vertices = new HashMap<>(); + HashMap edges = new HashMap<>(); + + String txId = dao.openTransaction(); + + try { + // Step 1. Handle edge deletes (must happen before vertex deletes) + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + deleteBulkEdge(edgePayload.getId(), version, txId); + } + } + + // Step 2: Handle vertex deletes + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + if (opr.getValue().getAsString().equalsIgnoreCase("delete")) { + String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()); + deleteBulkVertex(vertexPayload.getId(), version, type, txId); + } + } + + // Step 3: Handle vertex add/modify (must happen before edge adds) + for (JsonElement v : payload.getObjects()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add vertex + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, true)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(), + vertexPayload.getProperties()); + Vertex persistedVertex = addBulkVertex(validatedVertex, version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Update vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + Vertex validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version, + vertexPayload.getType(), vertexPayload.getProperties()); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + + // Patch vertex + else if (opr.getValue().getAsString().equalsIgnoreCase("patch")) { + if ( (vertexPayload.getId() == null) || (vertexPayload.getType() == null) ) { + throw new CrudException("id and type must be specified for patch request", Status.BAD_REQUEST); + } + + vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(), + headers, false)); + + OperationResult existingVertexOpResult = dao.getVertex(vertexPayload.getId(), OxmModelValidator.resolveCollectionType(version, vertexPayload.getType()), version, new HashMap()); + Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version); + Vertex validatedVertex = OxmModelValidator.validateIncomingPatchPayload(vertexPayload.getId(), + version, vertexPayload.getType(), vertexPayload.getProperties(), existingVertex); + Vertex persistedVertex = updateBulkVertex(validatedVertex, vertexPayload.getId(), version, txId); + Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex); + vertices.put(item.getKey(), outgoingVertex); + } + } + + // Step 4: Handle edge add/modify + for (JsonElement v : payload.getRelationships()) { + List> entries = new ArrayList>( + v.getAsJsonObject().entrySet()); + + if (entries.size() != 2) { + throw new CrudException("", Status.BAD_REQUEST); + } + Map.Entry opr = entries.get(0); + Map.Entry item = entries.get(1); + EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString()); + + // Add/Update edge + if (opr.getValue().getAsString().equalsIgnoreCase("add") + || opr.getValue().getAsString().equalsIgnoreCase("modify") + || opr.getValue().getAsString().equalsIgnoreCase("patch")) { + Edge validatedEdge; + Edge persistedEdge; + if (opr.getValue().getAsString().equalsIgnoreCase("add")) { + // Fix the source/destination + if (edgePayload.getSource().startsWith("$")) { + Vertex source = vertices.get(edgePayload.getSource().substring(1)); + if (source == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get()); + } + if (edgePayload.getTarget().startsWith("$")) { + Vertex target = vertices.get(edgePayload.getTarget().substring(1)); + if (target == null) { + throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1), + Status.INTERNAL_SERVER_ERROR); + } + edgePayload + .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get()); + } + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(CrudServiceUtil.determineEdgeType(edgePayload, version)); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),edgePayload); + persistedEdge = addBulkEdge(validatedEdge, version, txId); + } else if (opr.getValue().getAsString().equalsIgnoreCase("modify")) { + Edge edge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(edge.getType()); + } + + validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload); + persistedEdge = updateBulkEdge(validatedEdge, version, txId); + } else { + if (edgePayload.getId() == null) { + throw new CrudException("id must be specified for patch request", Status.BAD_REQUEST); + } + Edge existingEdge = dao.getEdge(edgePayload.getId(), txId); + + // If the type isn't set, resolve it based on on the sourece and target vertex types + if (edgePayload.getType() == null || edgePayload.getType().isEmpty()) { + edgePayload.setType(existingEdge.getType()); + } + + Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(existingEdge, version, edgePayload); + persistedEdge = updateBulkEdge(patchedEdge, version, txId); + } + + + Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge); + edges.put(item.getKey(), outgoingEdge); + } + } + + // commit transaction + dao.commitTransaction(txId); + } catch (CrudException ex) { + dao.rollbackTransaction(txId); + throw ex; + } catch (Exception ex) { + dao.rollbackTransaction(txId); + throw ex; + } finally { + if (dao.transactionExists(txId)) { + dao.rollbackTransaction(txId); + } + } + + return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload); + } + private GraphEvent publishEvent(GraphEvent event) throws CrudException { GraphEventEnvelope response = sendAndWait(event); responseHandler.handleBulkEventResponse(event, response); diff --git a/src/main/java/org/onap/crud/service/CrudGraphDataService.java b/src/main/java/org/onap/crud/service/CrudGraphDataService.java index 034b0bf..3916bc6 100644 --- a/src/main/java/org/onap/crud/service/CrudGraphDataService.java +++ b/src/main/java/org/onap/crud/service/CrudGraphDataService.java @@ -23,21 +23,32 @@ package org.onap.crud.service; import java.util.HashMap; import java.util.List; + import javax.ws.rs.core.EntityTag; +import javax.ws.rs.core.HttpHeaders; + import org.apache.commons.lang3.tuple.ImmutablePair; +import org.onap.aai.cl.api.Logger; +import org.onap.aai.cl.eelf.LoggerFactory; import org.onap.aai.restclient.client.OperationResult; import org.onap.crud.dao.GraphDao; +import org.onap.crud.dao.champ.ChampBulkPayload; +import org.onap.crud.dao.champ.ChampBulkPayloadResponse; import org.onap.crud.entity.Edge; import org.onap.crud.entity.Vertex; import org.onap.crud.exception.CrudException; +import org.onap.crud.logging.CrudServiceMsgs; +import org.onap.crud.service.BulkPayload; import org.onap.crud.parser.CrudResponseBuilder; import org.onap.crud.util.CrudServiceUtil; import org.onap.schema.OxmModelValidator; import org.onap.schema.RelationshipSchemaValidator; +import com.google.gson.GsonBuilder; -public class CrudGraphDataService extends AbstractGraphDataService { +public class CrudGraphDataService extends AbstractGraphDataService { + Logger logger = LoggerFactory.getInstance().getLogger(CrudGraphDataService.class.getName()); public CrudGraphDataService(GraphDao dao) throws CrudException { super(); @@ -168,34 +179,16 @@ public class CrudGraphDataService extends AbstractGraphDataService { Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(Edge.fromJson(operationResult.getResult()), version, payload); return updateEdge(version, patchedEdge); } - - @Override - protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException { - return dao.addVertex(vertex.getType(), vertex.getProperties(), version, dbTransId); - } - - @Override - protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException { - return dao.updateVertex(id, vertex.getType(), vertex.getProperties(), version, dbTransId); - } - - @Override - protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException { - dao.deleteVertex(id, type, dbTransId); - } - - @Override - protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - return dao.addEdge(edge.getType(), edge.getSource(), edge.getTarget(), edge.getProperties(), version, dbTransId); - } - - @Override - protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException { - return dao.updateEdge(edge, dbTransId); - } - + @Override - protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException { - dao.deleteEdge(id, dbTransId); + public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException { + ChampBulkPayload champPayload = new ChampBulkPayload(); + champPayload.fromGizmoPayload(payload, version, headers, dao); + logger.info(CrudServiceMsgs.CHAMP_BULK_OP_INFO, "ChampBulkPayload-> "+new GsonBuilder().disableHtmlEscaping().setPrettyPrinting().create().toJson(champPayload)); + OperationResult bulkResult = dao.bulkOperation(champPayload); + + ChampBulkPayloadResponse response = ChampBulkPayloadResponse.fromJson(bulkResult.getResult()); + response.populateChampData(version); + return CrudResponseBuilder.buildUpsertBulkResponse(response.getVertices(), response.getEdges(), version, payload); } } -- cgit 1.2.3-korg