aboutsummaryrefslogtreecommitdiffstats
path: root/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java
diff options
context:
space:
mode:
Diffstat (limited to 'bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java')
-rw-r--r--bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java293
1 files changed, 293 insertions, 0 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java
new file mode 100644
index 0000000..2854193
--- /dev/null
+++ b/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowAsyncResource.java
@@ -0,0 +1,293 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * OPENECOMP - MSO
+ * ================================================================================
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.mso.bpmn.common.workflow.service;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.Response;
+
+import org.camunda.bpm.engine.ProcessEngineServices;
+import org.camunda.bpm.engine.ProcessEngines;
+import org.camunda.bpm.engine.RuntimeService;
+import org.camunda.bpm.engine.runtime.ProcessInstance;
+import org.camunda.bpm.engine.variable.impl.VariableMapImpl;
+import org.jboss.resteasy.annotations.Suspend;
+import org.jboss.resteasy.spi.AsynchronousResponse;
+import org.openecomp.mso.logger.MessageEnum;
+import org.openecomp.mso.logger.MsoLogger;
+import org.slf4j.MDC;
+
+/**
+ *
+ * @version 1.0
+ * Asynchronous Workflow processing using JAX RS RESTeasy implementation
+ * Both Synchronous and Asynchronous BPMN process can benefit from this implementation since the workflow gets executed in the background
+ * and the server thread is freed up, server scales better to process more incoming requests
+ *
+ * Usage: For synchronous process, when you are ready to send the response invoke the callback to write the response
+ * For asynchronous process - the activity may send a acknowledgement response and then proceed further on executing the process
+ */
+@Path("/async")
+public abstract class WorkflowAsyncResource {
+
+ private WorkflowContextHolder contextHolder = WorkflowContextHolder.getInstance();
+ protected ProcessEngineServices pes4junit = null;
+
+ private MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
+
+ private static final String logMarker = "[WRKFLOW-RESOURCE]";
+ private static final int DEFAULT_WAIT_TIME = 30000; //default wait time
+
+ /**
+ * Asynchronous JAX-RS method that starts a process instance.
+ * @param asyncResponse an object that will receive the asynchronous response
+ * @param processKey the process key
+ * @param variableMap input variables to the process
+ */
+ @POST
+ @Path("/services/{processKey}")
+ @Produces("application/json")
+ @Consumes("application/json")
+ public void startProcessInstanceByKey(final @Suspend(180000) AsynchronousResponse asyncResponse,
+ @PathParam("processKey") String processKey, VariableMapImpl variableMap) {
+
+ WorkflowResponse response = new WorkflowResponse();
+ long startTime = System.currentTimeMillis();
+ Map<String, Object> inputVariables = null;
+ WorkflowContext workflowContext = null;
+
+ try {
+ inputVariables = getInputVariables(variableMap);
+ setLogContext(processKey, inputVariables);
+
+ // This variable indicates that the flow was invoked asynchronously
+ inputVariables.put("isAsyncProcess", "true");
+
+ workflowContext = new WorkflowContext(processKey, getRequestId(inputVariables),
+ asyncResponse, getWaitTime(inputVariables));
+
+ msoLogger.debug("Adding the workflow context into holder: "
+ + workflowContext.getProcessKey() + ":"
+ + workflowContext.getRequestId() + ":"
+ + workflowContext.getTimeout());
+
+ contextHolder.put(workflowContext);
+
+ ProcessThread processThread = new ProcessThread(processKey, inputVariables);
+ processThread.start();
+ } catch (Exception e) {
+ setLogContext(processKey, inputVariables);
+
+ if (workflowContext != null) {
+ contextHolder.remove(workflowContext);
+ }
+
+ msoLogger.debug(logMarker + "Exception in startProcessInstance by key");
+ response.setMessage("Fail" );
+ response.setResponse("Error occurred while executing the process: " + e);
+ response.setMessageCode(500);
+ recordEvents(processKey, response, startTime);
+
+ msoLogger.error (MessageEnum.BPMN_GENERAL_EXCEPTION_ARG, "BPMN", MsoLogger.getServiceName(), MsoLogger.ErrorCode.UnknownError, logMarker
+ + response.getMessage() + " for processKey: "
+ + processKey + " with response: " + response.getResponse());
+
+ Response errorResponse = Response.serverError().entity(response).build();
+ asyncResponse.setResponse(errorResponse);
+ }
+ }
+
+ /**
+ *
+ * @version 1.0
+ *
+ */
+ class ProcessThread extends Thread {
+ private final String processKey;
+ private final Map<String,Object> inputVariables;
+
+ public ProcessThread(String processKey, Map<String, Object> inputVariables) {
+ this.processKey = processKey;
+ this.inputVariables = inputVariables;
+ }
+
+ public void run() {
+
+ String processInstanceId = null;
+ long startTime = System.currentTimeMillis();
+
+ try {
+ setLogContext(processKey, inputVariables);
+
+ // Note: this creates a random businessKey if it wasn't specified.
+ String businessKey = getBusinessKey(inputVariables);
+
+ msoLogger.debug(logMarker + "***Received MSO startProcessInstanceByKey with processKey: "
+ + processKey + " and variables: " + inputVariables);
+
+ msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker
+ + "Call to MSO workflow/services in Camunda. Received MSO startProcessInstanceByKey with processKey:"
+ + processKey + " and variables: " + inputVariables);
+
+ RuntimeService runtimeService = getProcessEngineServices().getRuntimeService();
+ ProcessInstance processInstance = runtimeService.startProcessInstanceByKey(
+ processKey, businessKey, inputVariables);
+ processInstanceId = processInstance.getId();
+
+ msoLogger.debug(logMarker + "Process " + processKey + ":" + processInstanceId + " " +
+ (processInstance.isEnded() ? "ENDED" : "RUNNING"));
+ } catch (Exception e) {
+
+ msoLogger.recordAuditEvent(startTime, MsoLogger.StatusCode.ERROR, MsoLogger.ResponseCode.InternalError,
+ logMarker + "Error in starting the process: "+ e.getMessage());
+
+ WorkflowCallbackResponse callbackResponse = new WorkflowCallbackResponse();
+ callbackResponse.setStatusCode(500);
+ callbackResponse.setMessage("Fail");
+ callbackResponse.setResponse("Error occurred while executing the process: " + e);
+
+ // TODO: is the processInstanceId used by the API handler? I don't think so.
+ // It may be null here.
+ WorkflowContextHolder.getInstance().processCallback(
+ processKey, processInstanceId,
+ getRequestId(inputVariables),
+ callbackResponse);
+ }
+ }
+ }
+
+
+ /**
+ * Callback resource which is invoked from BPMN to process to send the workflow response
+ *
+ * @param processKey
+ * @param processInstanceId
+ * @param requestId
+ * @param callbackResponse
+ * @return
+ */
+ @POST
+ @Path("/services/callback/{processKey}/{processInstanceId}/{requestId}")
+ @Produces("application/json")
+ @Consumes("application/json")
+ public Response processWorkflowCallback(
+ @PathParam("processKey") String processKey,
+ @PathParam("processInstanceId") String processInstanceId,
+ @PathParam("requestId")String requestId,
+ WorkflowCallbackResponse callbackResponse) {
+
+ msoLogger.debug(logMarker + "Process instance ID:" + processInstanceId + ":" + requestId + ":" + processKey + ":" + isProcessEnded(processInstanceId));
+ msoLogger.debug(logMarker + "About to process the callback request:" + callbackResponse.getResponse() + ":" + callbackResponse.getMessage() + ":" + callbackResponse.getStatusCode());
+ return contextHolder.processCallback(processKey, processInstanceId, requestId, callbackResponse);
+ }
+
+ // Note: the business key is used to identify the process in unit tests
+ private String getBusinessKey(Map<String, Object> inputVariables) {
+ Object businessKey = inputVariables.get("mso-business-key");
+ if (businessKey == null ) {
+ businessKey = UUID.randomUUID().toString();
+ inputVariables.put("mso-business-key", businessKey);
+ }
+ return businessKey.toString();
+ }
+
+ private String getRequestId(Map<String, Object> inputVariables) {
+ Object requestId = inputVariables.get("mso-request-id");
+ if (requestId == null ) {
+ requestId = UUID.randomUUID().toString();
+ inputVariables.put("mso-request-id", requestId);
+ }
+ return requestId.toString();
+ }
+
+ private long getWaitTime(Map<String, Object> inputVariables)
+ {
+ String timeout = inputVariables.get("mso-service-request-timeout") == null
+ ? null : inputVariables.get("mso-service-request-timeout").toString();
+
+ if (timeout != null) {
+ try {
+ return Long.parseLong(timeout)*1000;
+ } catch (NumberFormatException nex) {
+ msoLogger.debug("Invalid input for mso-service-request-timeout");
+ }
+ }
+
+ return DEFAULT_WAIT_TIME;
+ }
+
+ private void recordEvents(String processKey, WorkflowResponse response,
+ long startTime) {
+
+ msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
+ logMarker + response.getMessage() + " for processKey: "
+ + processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
+
+ msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc,
+ logMarker + response.getMessage() + "for processKey: " + processKey + " with response: " + response.getResponse());
+
+ }
+
+ private void setLogContext(String processKey,
+ Map<String, Object> inputVariables) {
+ MsoLogger.setServiceName("MSO." + processKey);
+ if (inputVariables != null) {
+ MsoLogger.setLogContext(getKeyValueFromInputVariables(inputVariables,"mso-request-id"), getKeyValueFromInputVariables(inputVariables,"mso-service-instance-id"));
+ }
+ }
+
+ private String getKeyValueFromInputVariables(Map<String,Object> inputVariables, String key) {
+ if (inputVariables == null) return "";
+ Object requestId = inputVariables.get(key);
+ if (requestId != null) return requestId.toString();
+ return "N/A";
+ }
+
+ private boolean isProcessEnded(String processInstanceId) {
+ ProcessEngineServices pes = getProcessEngineServices();
+ return pes.getRuntimeService().createProcessInstanceQuery().processInstanceId(processInstanceId).singleResult() == null ? true : false ;
+ }
+
+
+ protected abstract ProcessEngineServices getProcessEngineServices();
+
+ public void setProcessEngineServices4junit(ProcessEngineServices pes) {
+ pes4junit = pes;
+ }
+
+ private Map<String, Object> getInputVariables(VariableMapImpl variableMap) {
+ Map<String, Object> inputVariables = new HashMap<String,Object>();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> vMap = (Map<String, Object>) variableMap.get("variables");
+ for (String vName : vMap.keySet()) {
+ @SuppressWarnings("unchecked")
+ Map<String, Object> valueMap = (Map<String,Object>)vMap.get(vName); // value, type
+ inputVariables.put(vName, valueMap.get("value"));
+ }
+ return inputVariables;
+ }
+}