summaryrefslogtreecommitdiffstats
path: root/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java')
-rw-r--r--src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java469
1 files changed, 469 insertions, 0 deletions
diff --git a/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
new file mode 100644
index 0000000..9efc7df
--- /dev/null
+++ b/src/main/java/org/onap/crud/service/CrudAsyncGraphDataService.java
@@ -0,0 +1,469 @@
+/**
+ * ============LICENSE_START=======================================================
+ * Gizmo
+ * ================================================================================
+ * Copyright © 2017 AT&T Intellectual Property.
+ * Copyright © 2017 Amdocs
+ * All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ * ECOMP is a trademark and service mark of AT&T Intellectual Property.
+ */
+package org.onap.crud.service;
+
+import com.att.ecomp.event.api.EventConsumer;
+import com.att.ecomp.event.api.EventPublisher;
+
+import org.onap.aai.cl.api.LogFields;
+import org.onap.aai.cl.api.Logger;
+import org.onap.aai.cl.eelf.LoggerFactory;
+import org.onap.aai.cl.mdc.MdcContext;
+import org.onap.aai.cl.mdc.MdcOverride;
+import org.onap.crud.dao.GraphDao;
+import org.onap.crud.entity.Edge;
+import org.onap.crud.entity.Vertex;
+import org.onap.crud.event.GraphEvent;
+import org.onap.crud.event.GraphEvent.GraphEventOperation;
+import org.onap.crud.event.GraphEvent.GraphEventResult;
+import org.onap.crud.event.GraphEventEdge;
+import org.onap.crud.event.GraphEventVertex;
+import org.onap.crud.exception.CrudException;
+import org.onap.crud.logging.CrudServiceMsgs;
+import org.onap.crud.parser.CrudResponseBuilder;
+import org.onap.crud.util.CrudProperties;
+import org.onap.crud.util.CrudServiceConstants;
+import org.onap.crud.util.CrudServiceUtil;
+import org.onap.schema.OxmModelValidator;
+import org.onap.schema.RelationshipSchemaValidator;
+
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.PreDestroy;
+import javax.ws.rs.core.Response.Status;
+
+public class CrudAsyncGraphDataService {
+
+ private static Integer requestTimeOut;
+
+ private GraphDao dao;
+
+ private EventPublisher asyncRequestPublisher;
+
+ private Timer timer;
+
+ public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
+ private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
+
+ private static Logger logger = LoggerFactory.getInstance()
+ .getLogger(CrudAsyncGraphDataService.class.getName());
+ private static Logger metricsLogger = LoggerFactory.getInstance()
+ .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
+ private static LogFields OK_FIELDS = new LogFields();
+
+ static {
+ OK_FIELDS.setField(Status.OK, Status.OK.toString());
+ }
+
+ public static Integer getRequestTimeOut() {
+ return requestTimeOut;
+ }
+
+ public CrudAsyncGraphDataService(GraphDao dao,
+ EventPublisher asyncRequestPublisher,
+ EventConsumer asyncResponseConsumer) throws CrudException {
+
+ requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
+ try {
+ requestTimeOut
+ = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
+ } catch (NumberFormatException ex) {
+ // Leave it as the default
+ }
+
+ Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
+ try {
+ responsePollInterval = Integer
+ .parseInt(CrudProperties
+ .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
+ } catch (Exception ex) {
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
+ + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
+ + " error: " + ex.getMessage());
+ }
+
+ this.dao = dao;
+
+ // Start the Response Consumer timer
+ CrudAsyncResponseConsumer crudAsyncResponseConsumer
+ = new CrudAsyncResponseConsumer(asyncResponseConsumer);
+ timer = new Timer("crudAsyncResponseConsumer-1");
+ timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
+
+ this.asyncRequestPublisher = asyncRequestPublisher;
+
+ // load the schemas
+ CrudServiceUtil.loadModels();
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
+ }
+
+ public class CollectGraphResponse implements Callable<GraphEvent> {
+ private volatile GraphEvent graphEvent;
+ private volatile CountDownLatch latch = new CountDownLatch(1);
+
+ @Override
+ public GraphEvent call() throws TimeoutException {
+ try {
+ // Wait until graphEvent is available
+ latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ latch.countDown();
+ if (this.graphEvent != null) {
+ return this.graphEvent;
+ } else {
+ throw new TimeoutException();
+ }
+ }
+ return this.graphEvent;
+ }
+
+ public void populateGraphEvent(GraphEvent event) {
+ this.graphEvent = event;
+ latch.countDown();
+ }
+ }
+
+ private GraphEvent sendAndWait(GraphEvent event) throws Exception {
+
+ long startTimeInMs = System.currentTimeMillis();
+ SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
+ MdcOverride override = new MdcOverride();
+ override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
+
+ // publish to request queue
+ asyncRequestPublisher.sendSync(event.toJson());
+
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
+ + " , transaction-id: " + event.getTransactionId() + " , operation: "
+ + event.getOperation().toString());
+
+ ExecutorService executor = Executors
+ .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
+ CollectGraphResponse collector = new CollectGraphResponse();
+ CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
+ GraphEvent response;
+ Future<GraphEvent> future = executor.submit(collector);
+ try {
+ response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
+
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
+ logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
+ "Request timed out for transactionId: " + event.getTransactionId());
+ future.cancel(true);
+ throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
+ + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ //Kill the thread as the work is completed
+ executor.shutdownNow();
+ }
+ metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
+ "Total elapsed time for operation: " + event.getOperation().toString()
+ + " , transactionId: " + event.getTransactionId() + " is "
+ + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
+ return response;
+ }
+
+ public String addVertex(String version, String type, VertexPayload payload) throws Exception {
+ // Validate the incoming payload
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
+ type, payload.getProperties());
+ // Create graph request event
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertVertexResponse(
+ OxmModelValidator.validateOutgoingPayload(version,
+ response.getVertex().toVertex()), version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String addEdge(String version, String type, EdgePayload payload) throws Exception {
+ Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
+ // Create graph request event
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
+ .edge(GraphEventEdge.fromEdge(edge, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertEdgeResponse(
+ RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
+ version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+ }
+
+ public String updateVertex(String version, String id, String type, VertexPayload payload)
+ throws Exception {
+ Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
+ type, payload.getProperties());
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertVertexResponse(
+ OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
+ version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String patchVertex(String version, String id, String type, VertexPayload payload)
+ throws Exception {
+ Vertex existingVertex
+ = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
+ Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
+ type, payload.getProperties(),
+ existingVertex);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertVertexResponse(
+ OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
+ version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String deleteVertex(String version, String id, String type) throws Exception {
+ type = OxmModelValidator.resolveCollectionType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .vertex(new GraphEventVertex(id, version, type, null)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return "";
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String deleteEdge(String version, String id, String type) throws Exception {
+ RelationshipSchemaValidator.validateType(version, type);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
+ .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return "";
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String updateEdge(String version, String id, String type, EdgePayload payload)
+ throws Exception {
+ Edge edge = dao.getEdge(id, type);
+ Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
+ payload);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertEdgeResponse(
+ RelationshipSchemaValidator.validateOutgoingPayload(version,
+ response.getEdge().toEdge()), version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String patchEdge(String version, String id, String type, EdgePayload payload)
+ throws Exception {
+ Edge edge = dao.getEdge(id, type);
+ Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
+ payload);
+ GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
+ .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
+
+ GraphEvent response = sendAndWait(event);
+ if (response.getResult().equals(GraphEventResult.SUCCESS)) {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult());
+ return CrudResponseBuilder.buildUpsertEdgeResponse(
+ RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
+ version);
+ } else {
+ logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
+ "Event response received: " + response.getObjectType() + " with key: "
+ + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
+ + " , operation: " + event.getOperation().toString() + " , result: "
+ + response.getResult() + " , error: " + response.getErrorMessage());
+ throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
+ + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
+ }
+
+ }
+
+ public String getEdge(String version, String id, String type) throws CrudException {
+ RelationshipSchemaValidator.validateType(version, type);
+ Edge edge = dao.getEdge(id, type);
+
+ return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
+ .validateOutgoingPayload(version, edge),
+ version);
+ }
+
+ public String getEdges(String version, String type, Map<String, String> filter)
+ throws CrudException {
+ RelationshipSchemaValidator.validateType(version, type);
+ List<Edge> items = dao.getEdges(type,
+ RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
+ return CrudResponseBuilder.buildGetEdgesResponse(items, version);
+ }
+
+ public String getVertex(String version, String id, String type) throws CrudException {
+ type = OxmModelValidator.resolveCollectionType(version, type);
+ Vertex vertex = dao.getVertex(id, type);
+ List<Edge> edges = dao.getVertexEdges(id);
+ return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
+ .validateOutgoingPayload(version, vertex), edges,
+ version);
+ }
+
+ public String getVertices(String version, String type, Map<String, String> filter)
+ throws CrudException {
+ type = OxmModelValidator.resolveCollectionType(version, type);
+ List<Vertex> items = dao.getVertices(type,
+ OxmModelValidator.resolveCollectionfilter(version, type, filter));
+ return CrudResponseBuilder.buildGetVerticesResponse(items, version);
+ }
+
+ @PreDestroy
+ protected void preShutdown() {
+ timer.cancel();
+
+ }
+
+
+} \ No newline at end of file