aboutsummaryrefslogtreecommitdiffstats
path: root/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/WorkflowContextHolder.java
blob: ca3b1f1390bcd4112f3acca1ce73ace69f00bb89 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/*-
 * ============LICENSE_START=======================================================
 * OPENECOMP - MSO
 * ================================================================================
 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
 * ================================================================================
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 * ============LICENSE_END=========================================================
 */

package org.openecomp.mso.bpmn.common.workflow.service;

import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.core.Response;

import org.jboss.resteasy.spi.AsynchronousResponse;
import org.slf4j.MDC;

import org.openecomp.mso.logger.MessageEnum;
import org.openecomp.mso.logger.MsoLogger;

/**
 * Workflow Context Holder instance which can be accessed elsewhere either in groovy scripts or Java
 * @version 1.0
 *
 */
public class WorkflowContextHolder {

	private static MsoLogger msoLogger = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);
	private static final String logMarker = "[WORKFLOW-CONTEXT-HOLDER]";
	private static WorkflowContextHolder instance = null;

	/**
	 * Delay Queue which holds workflow context holder objects
	 */
	private final DelayQueue<WorkflowContext> responseQueue = new DelayQueue<WorkflowContext>();
	private final TimeoutThread timeoutThread = new TimeoutThread();

	private WorkflowContextHolder() {
		timeoutThread.start();
	}

	/**
	 * Singleton holder which eliminates hot lock
	 * Since the JVM synchronizes static method there is no synchronization needed for this method
	 * @return
	 */
	public static synchronized WorkflowContextHolder getInstance() {
		if (instance == null) {
			instance = new WorkflowContextHolder();
		}
		return instance;
	}
	
	public void put(WorkflowContext context) {
		msoLogger.debug(logMarker + " Adding context to the queue: "
			+ context.getRequestId());
		responseQueue.put(context);
	}
	
	public void remove(WorkflowContext context) {
		msoLogger.debug(logMarker + " Removing context from the queue: "
			+ context.getRequestId());
		responseQueue.remove(context);
	}
	
	public WorkflowContext getWorkflowContext(String requestId) {
		// Note: DelayQueue interator is threadsafe
		for (WorkflowContext context : responseQueue) {
			if (requestId.equals(context.getRequestId())) {
				msoLogger.debug("Found context for request id: " + requestId);
				return context;
			}
		}

		msoLogger.debug("Unable to find context for request id: " + requestId);
		return null;
	}
	
	/**
	 * Builds the callback response object to respond to client
	 * @param processKey
	 * @param processInstanceId
	 * @param requestId
	 * @param callbackResponse
	 * @return
	 */
	public Response processCallback(String processKey, String processInstanceId,
			String requestId, WorkflowCallbackResponse callbackResponse) {
		WorkflowResponse workflowResponse = new WorkflowResponse();
		WorkflowContext workflowContext = getWorkflowContext(requestId);

		if (workflowContext == null) {
			msoLogger.debug("Unable to correlate workflow context for request id: " + requestId
				+ ":processInstance Id:" + processInstanceId
				+ ":process key:" + processKey);
			workflowResponse.setMessage("Fail");
			workflowResponse.setMessageCode(400);
			workflowResponse.setResponse("Unable to correlate workflow context, bad request. Request Id: " + requestId);
			return Response.serverError().entity(workflowResponse).build();
		}

		responseQueue.remove(workflowContext);

		msoLogger.debug("Using callback response for request id: " + requestId);
		workflowResponse.setResponse(callbackResponse.getResponse());
		workflowResponse.setProcessInstanceID(processInstanceId);
		workflowResponse.setMessageCode(callbackResponse.getStatusCode());
		workflowResponse.setMessage(callbackResponse.getMessage());
		sendWorkflowResponseToClient(processKey, workflowContext, workflowResponse);
		return Response.ok().entity(workflowResponse).build();
	}
	
	/**
	 * Send the response to client asynchronously when invoked by the BPMN process
	 * @param processKey
	 * @param workflowContext
	 * @param workflowResponse
	 */
	private void sendWorkflowResponseToClient(String processKey, WorkflowContext workflowContext,
			WorkflowResponse workflowResponse) {
		msoLogger.debug(logMarker + "Sending the response for request id: " + workflowContext.getRequestId());
		recordEvents(processKey, workflowResponse, workflowContext.getStartTime());
		Response response = Response.status(workflowResponse.getMessageCode()).entity(workflowResponse).build();
		AsynchronousResponse asyncResp = workflowContext.getAsynchronousResponse();
		asyncResp.setResponse(response);
	}

	/**
	 * Timeout thread which monitors the delay queue for expired context and send timeout response
	 * to client
	 *
	 * */
	private class TimeoutThread extends Thread {
		public void run() {
			while (!isInterrupted()) {
				try {
					WorkflowContext requestObject = responseQueue.take();					
					msoLogger.debug("Time remaining for request id: " + requestObject.getRequestId() + ":" + requestObject.getDelay(TimeUnit.MILLISECONDS));
					msoLogger.debug("Preparing timeout response for " + requestObject.getProcessKey() + ":" + ":" + requestObject.getRequestId());
					WorkflowResponse response = new WorkflowResponse();
					response.setMessage("Fail");
					response.setResponse("Request timedout, request id:" + requestObject.getRequestId());
					//response.setProcessInstanceID(requestObject.getProcessInstance().getProcessInstanceId());
					recordEvents(requestObject.getProcessKey(), response, requestObject.getStartTime());
					response.setMessageCode(500);
					Response result = Response.status(500).entity(response).build();
					requestObject.getAsynchronousResponse().setResponse(result);
					msoLogger.debug("Sending timeout response for request id:" + requestObject.getRequestId() + ":response:" + response);
				} catch (InterruptedException e) {
					break;
				} catch (Exception e) {
					msoLogger.debug("WorkflowContextHolder timeout thread caught exception: " + e);
				msoLogger.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
						MsoLogger.ErrorCode.UnknownError, "Error in WorkflowContextHolder timeout thread");
				
				}
			}

			msoLogger.debug("WorkflowContextHolder timeout thread interrupted, quitting");
		}
	}
	
	private static void recordEvents(String processKey, WorkflowResponse response,
			long startTime) {

		msoLogger.recordMetricEvent ( startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, 
				logMarker + response.getMessage() + " for processKey: "
				+ processKey + " with response: " + response.getResponse(), "BPMN", MDC.get(processKey), null);
		
		msoLogger.recordAuditEvent (startTime, MsoLogger.StatusCode.COMPLETE, MsoLogger.ResponseCode.Suc, logMarker 
				+ response.getMessage() + " for processKey: " 
				+ processKey + " with response: " + response.getResponse());
		
	}
}