aboutsummaryrefslogtreecommitdiffstats
path: root/bpmn/MSOCommonBPMN/src/main/java/org/openecomp/mso/bpmn/common/workflow/service/AbstractCallbackService.java
blob: 214ae28b704757a9e14e7f03f9b3983746b1912a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
package org.openecomp.mso.bpmn.common.workflow.service;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.camunda.bpm.BpmPlatform;
import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
import org.camunda.bpm.engine.ProcessEngineServices;
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.MessageCorrelationResult;
import org.openecomp.mso.bpmn.core.PropertyConfiguration;
import org.openecomp.mso.logger.MessageEnum;
import org.openecomp.mso.logger.MsoLogger;

/**
 * Abstract base class for callback services.
 */
public abstract class AbstractCallbackService {
	public static final long DEFAULT_TIMEOUT_SECONDS = 60;
	public static final long FAST_POLL_DUR_SECONDS = 5;
	public static final long FAST_POLL_INT_MS = 100;
	public static final long SLOW_POLL_INT_MS = 1000;
	
	private static final MsoLogger LOGGER = MsoLogger.getMsoLogger(MsoLogger.Catalog.BPEL);

	protected volatile ProcessEngineServices pes4junit = null;
	
	/**
	 * Parameterized callback handler.
	 */
	protected CallbackResult handleCallback(String method, Object message,
			String messageEventName, String messageVariable,
			String correlationVariable, String correlationValue,
			String logMarker) {

		return handleCallback(method, message, messageEventName, messageVariable,
			correlationVariable, correlationValue, logMarker, null);
	}

	/**
	 * Parameterized callback handler.
	 */
	protected CallbackResult handleCallback(String method, Object message,
			String messageEventName, String messageVariable,
			String correlationVariable, String correlationValue,
			String logMarker, Map<String, Object> injectedVariables) {

		long startTime = System.currentTimeMillis();

		LOGGER.debug(logMarker + " " + method + " received message: "
			+ (message == null ? "" : System.lineSeparator()) + message);

		try {
			Map<String, Object> variables = new HashMap<String, Object>();

			if (injectedVariables != null) {
				variables.putAll(injectedVariables);
			}

			variables.put(correlationVariable, correlationValue);
			variables.put(messageVariable, message == null ? null : message.toString());

			boolean ok = correlate(messageEventName, correlationVariable,
				correlationValue, variables, logMarker);

			if (!ok) {
				String msg = "No process is waiting for " + messageEventName
					+ " with " + correlationVariable + " = '" + correlationValue + "'";
				logCallbackError(method, startTime, msg);
				return new CallbackError(msg);
			}

			logCallbackSuccess(method, startTime);
			return new CallbackSuccess();
		} catch (Exception e) {
			String msg = "Caught " + e.getClass().getSimpleName()
				+ " processing " + messageEventName + " with " + correlationVariable
				+ " = '" + correlationValue + "'";
			logCallbackError(method, startTime, msg);
			return new CallbackError(msg);
		}
	}
	
	/**
	 * Performs message correlation.  Waits a limited amount of time for
	 * a process to become ready for correlation.  The return value indicates
	 * whether or not a process was found to receive the message.  Due to the
	 * synchronous nature of message injection in Camunda, by the time this
	 * method returns, one of 3 things will have happened: (1) the process
	 * received the message and ended, (2) the process received the message
	 * and reached an activity that suspended, or (3) an exception occurred
	 * during correlation or while the process was executing.  Correlation
	 * exceptions are handled differently from process execution exceptions.
	 * Correlation exceptions are thrown so the client knows something went
	 * wrong with the delivery of the message.  Process execution exceptions
	 * are logged but not thrown.
	 * @param messageEventName the message event name
	 * @param correlationVariable the process variable used as the correlator
	 * @param correlationValue the correlation value
	 * @param variables variables to inject into the process
	 * @param logMarker a marker for debug logging
	 * @return true if a process could be found, false if not
	 * @throws Exception for correlation errors
	 */
	protected boolean correlate(String messageEventName, String correlationVariable,
			String correlationValue, Map<String, Object> variables, String logMarker)
			throws Exception {

		LOGGER.debug(logMarker + " Attempting to find process waiting"
			+ " for " + messageEventName + " with " + correlationVariable
			+ " = '" + correlationValue + "'");

		RuntimeService runtimeService =
			getProcessEngineServices().getRuntimeService();

		Map<String, String> properties =
			PropertyConfiguration.getInstance().getProperties("mso.bpmn.urn.properties");

		long timeout = DEFAULT_TIMEOUT_SECONDS;

		// The code is here in case we ever need to change the default.
		String s = properties.get("mso.correlation.timeout");
		if (s != null) {
			try {
				timeout = Long.parseLong(s);
			} catch (NumberFormatException e) {
				// Ignore
			}
		}

		long now = System.currentTimeMillis();
		long fastPollEndTime = now + (FAST_POLL_DUR_SECONDS * 1000);
		long endTime = now + (timeout * 1000);
		long sleep = FAST_POLL_INT_MS;

		List<Execution> waitingProcesses = null;
		Exception queryException = null;
		int queryCount = 0;
		int queryFailCount = 0;

		while (true) {
			try {
				++queryCount;
				waitingProcesses = runtimeService.createExecutionQuery()
					.messageEventSubscriptionName(messageEventName)
					.processVariableValueEquals(correlationVariable, correlationValue)
					.list();
			} catch (Exception e) {
				++queryFailCount;
				queryException = e;
			}

			if (waitingProcesses != null && waitingProcesses.size() > 0) {
				break;
			}

			if (now > endTime - sleep) {
				break;
			}

			Thread.sleep(sleep);
			now = System.currentTimeMillis();

			if (now > fastPollEndTime) {
				sleep = SLOW_POLL_INT_MS;
			}
		}

		if (waitingProcesses == null) {
			waitingProcesses = new ArrayList<Execution>(0);
		}

		int count = waitingProcesses.size();

		List<ExecInfo> execInfoList = new ArrayList<ExecInfo>(count);
		for (Execution execution : waitingProcesses) {
			execInfoList.add(new ExecInfo(execution));
		}

		LOGGER.debug(logMarker + " Found " + count + " process(es) waiting"
			+ " for " + messageEventName + " with " + correlationVariable
			+ " = '" + correlationValue + "': " + execInfoList);

		if (count == 0) {
			if (queryFailCount > 0) {
				String msg = queryFailCount + "/" + queryCount
					+ " execution queries failed attempting to correlate "
					+ messageEventName + " with " + correlationVariable
					+ " = '" + correlationValue + "'; last exception was:"
					+ queryException;
				LOGGER.debug(msg);
				LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
					MsoLogger.ErrorCode.UnknownError, msg, queryException);
			}

			return false;
		}

		if (count > 1) {
			// Only one process should be waiting. Throw an exception back to the client.
			throw new MismatchingMessageCorrelationException(messageEventName,
				"more than 1 process is waiting with " + correlationVariable
				+ " = '" + correlationValue + "'");
		}
		
		// We prototyped an asynchronous solution, i.e. resuming the process
		// flow in a separate thread, but this affected too many existing tests,
		// and we went back to the synchronous solution. The synchronous solution
		// has some troublesome characteristics though.  For example, the
		// resumed flow may send request #2 to a remote system before MSO has
		// acknowledged the notification associated with request #1.  

		try {
			LOGGER.debug(logMarker + " Running " + execInfoList.get(0) + " to receive "
				+ messageEventName + " with " + correlationVariable + " = '"
				+ correlationValue + "'");

			@SuppressWarnings("unused")
			MessageCorrelationResult result = runtimeService
				.createMessageCorrelation(messageEventName)
				.setVariables(variables)
				.processInstanceVariableEquals(correlationVariable, correlationValue)
				.correlateWithResult();

		} catch (MismatchingMessageCorrelationException e) {
			// A correlation exception occurred even after we identified
			// one waiting process.  Throw it back to the client.
			throw e;
		} catch (Exception e) {
			// This must be an exception from the flow itself.  Log it, but don't
			// report it back to the client.
			String msg = "Caught " + e.getClass().getSimpleName() + " running "
				+ execInfoList.get(0) + " after receiving " + messageEventName
				+ " with " + correlationVariable + " = '" + correlationValue
				+ "': " + e;
			LOGGER.debug(msg);
			LOGGER.error(MessageEnum.BPMN_GENERAL_EXCEPTION, "BPMN", MsoLogger.getServiceName(),
				MsoLogger.ErrorCode.UnknownError, msg, e);
		}

		return true;
	}
	
	/**
	 * Records audit and metric events in the log for a callback success.
	 * @param method the method name
	 * @param startTime the request start time
	 */
	protected void logCallbackSuccess(String method, long startTime) {
		LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
			MsoLogger.ResponseCode.Suc, "Completed " + method);

		LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
			MsoLogger.ResponseCode.Suc, "Completed " + method,
			"BPMN", MsoLogger.getServiceName(), null);
	}

	/**
	 * Records error, audit and metric events in the log for a callback
	 * internal error.
	 * @param method the method name
	 * @param startTime the request start time
	 * @param msg the error message
	 */
	protected void logCallbackError(String method, long startTime, String msg) {
		logCallbackError(method, startTime, msg, null);
	}

	/**
	 * Records error, audit and metric events in the log for a callback
	 * internal error.
	 * @param method the method name
	 * @param startTime the request start time
	 * @param msg the error message
	 * @param e the exception
	 */
	protected void logCallbackError(String method, long startTime, String msg, Exception e) {
		if (e == null) {
			LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
				MsoLogger.ErrorCode.UnknownError, msg);
		} else {
			LOGGER.error(MessageEnum.BPMN_CALLBACK_EXCEPTION, "BPMN", MsoLogger.getServiceName(), 
				MsoLogger.ErrorCode.UnknownError, msg, e);
		}

		LOGGER.recordAuditEvent(startTime, MsoLogger.StatusCode.COMPLETE,
			MsoLogger.ResponseCode.InternalError, "Completed " + method);

		LOGGER.recordMetricEvent(startTime, MsoLogger.StatusCode.COMPLETE,
			MsoLogger.ResponseCode.InternalError, "Completed " + method,
			"BPMN", MsoLogger.getServiceName(), null);
	}
	
	/**
	 * Abstract callback result object.
	 */
	protected abstract class CallbackResult {
	}

	/**
	 * Indicates that callback handling was successful.
	 */
	protected class CallbackSuccess extends CallbackResult {
	}

	/**
	 * Indicates that callback handling failed.
	 */
	protected class CallbackError extends CallbackResult {
		private final String errorMessage;

		public CallbackError(String errorMessage) {
			this.errorMessage = errorMessage;
		}

		/**
		 * Gets the error message.
		 */
		public String getErrorMessage() {
			return errorMessage;
		}
	}

	private static class ExecInfo {
		private final Execution execution;

		public ExecInfo(Execution execution) {
			this.execution = execution;
		}
	
		@Override
		public String toString() {
			return "Process[" + execution.getProcessInstanceId()
				+ ":" + execution.getId() + "]";
		}
	}
	
	protected ProcessEngineServices getProcessEngineServices() {
		if (pes4junit == null) {
			return BpmPlatform.getDefaultProcessEngine();
		} else {
			return pes4junit;
		}
	}

	public void setProcessEngineServices4junit(ProcessEngineServices pes) {
		pes4junit = pes;
	}
}