aboutsummaryrefslogtreecommitdiffstats
path: root/bpmn/MSOCommonBPMN
diff options
context:
space:
mode:
authorLukasz Rajewski <lukasz.rajewski@t-mobile.pl>2022-08-31 13:45:15 +0000
committerGerrit Code Review <gerrit@onap.org>2022-08-31 13:45:15 +0000
commit66b09af14e0a2cf774e9f917562f4ae54587eb19 (patch)
treeeaa03610d63a864351933d58b3cfe1a52feb8469 /bpmn/MSOCommonBPMN
parentea65e0397e030bbd0a685f473b1c8416dd0a82f3 (diff)
parent5baa1ed97c1d2b98952a025c3bc76f60587e9670 (diff)
Merge "Enable long-running processes in ControllerExecutionBB"
Diffstat (limited to 'bpmn/MSOCommonBPMN')
-rw-r--r--bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java122
-rw-r--r--bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java4
-rw-r--r--bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java12
3 files changed, 130 insertions, 8 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
index e5d8a921a5..3ed1011ee9 100644
--- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
+++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
@@ -22,17 +22,24 @@
package org.onap.so.client.cds;
+import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Struct;
import com.google.protobuf.Struct.Builder;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Status;
+import java.util.UUID;
+import org.apache.commons.lang3.StringUtils;
+import org.camunda.bpm.engine.MismatchingMessageCorrelationException;
+import org.camunda.bpm.engine.ProcessEngine;
import org.camunda.bpm.engine.delegate.DelegateExecution;
+import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder;
import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput;
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput;
+import org.onap.logging.filter.base.ONAPComponents;
import org.onap.so.bpmn.common.BuildingBlockExecution;
import org.onap.so.client.PreconditionFailedException;
import org.onap.so.client.RestPropertiesLoader;
@@ -62,10 +69,17 @@ public class AbstractCDSProcessingBBUtils {
private static final String EXEC_INPUT = "executionServiceInput";
private static final String EXECUTION_OBJECT = "executionObject";
private static final String EXCEPTION = "Exception";
+ private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID";
+ private static final String CONTROLLER_MESSAGE = "ControllerMessage";
+
+ private static final String REQ_ID = "requestId";
@Autowired
protected ExceptionBuilder exceptionUtil;
+ @Autowired
+ private ProcessEngine processEngine;
+
/**
* Extracting data from execution object and building the ExecutionServiceInput Object
*
@@ -132,23 +146,28 @@ public class AbstractCDSProcessingBBUtils {
}
/**
- * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
+ * get the executionServiceInput object from execution and send a request to CDS Client
*
* @param execution BuildingBlockExecution object
*/
public void sendRequestToCDSClientBB(BuildingBlockExecution execution) {
-
logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object.");
try {
ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT);
- CDSResponse cdsResponse = getCdsResponse(executionServiceInput);
- execution.setVariable(CDS_STATUS, cdsResponse.status);
- if (cdsResponse.payload != null) {
- String payload = JsonFormat.printer().print(cdsResponse.payload);
- execution.setVariable(RESPONSE_PAYLOAD, payload);
+ String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId();
+ if (StringUtils.isBlank(messageCorrelationId)) {
+ throw new IllegalArgumentException("subRequestId can not be blank");
}
-
+ execution.setVariable(CDS_REQUEST_ID, messageCorrelationId);
+
+ MessageCorrelationBuilder messageCorrelationBuilder =
+ processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE)
+ .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId);
+ MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder);
+ CDSProcessingClient client = new CDSProcessingClient(handler);
+ handler.setClient(client);
+ client.sendRequest(executionServiceInput);
} catch (Exception ex) {
exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
}
@@ -249,6 +268,93 @@ public class AbstractCDSProcessingBBUtils {
}
}
+ private class MessageSendingHandler implements CDSProcessingListener {
+
+ private MessageCorrelationBuilder messageCorrelationBuilder;
+ private AutoCloseable client;
+ private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class);
+
+ MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) {
+ this.messageCorrelationBuilder = messageCorrelationBuilder;
+ }
+
+ public void setClient(AutoCloseable client) {
+ this.client = client;
+ }
+
+ @Override
+ public void onMessage(ExecutionServiceOutput message) {
+ logger.info("Received payload from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+
+ if (eventType == EventType.EVENT_COMPONENT_PROCESSING) {
+ return;
+ }
+
+ String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED;
+ messageCorrelationBuilder.setVariable(CDS_STATUS, status);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage());
+
+ if (message.hasPayload()) {
+ try {
+ String payload = JsonFormat.printer().print(message.getPayload());
+ messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload);
+ } catch (InvalidProtocolBufferException e) {
+ logger.error("Failed parsing cds response", e);
+ }
+ }
+ correlate();
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ logger.error("Failed sending CDS request", t);
+ messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage());
+ messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED);
+ correlate();
+ }
+
+ /**
+ * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This
+ * retry logic will allow camunda some time to finish transitioning the process.
+ */
+ private void correlate() {
+ try {
+ int remainingTries = 10;
+ while (!tryCorrelateMessage() && remainingTries > 0) {
+ logger.warn("Message correlation failed. Retries remaining: {}", remainingTries);
+ remainingTries--;
+ Thread.sleep(1000L);
+ }
+ } catch (InterruptedException e) {
+ logger.error("Thread interrupted during message correlation", e);
+ Thread.currentThread().interrupt();
+ } finally {
+ closeClient();
+ }
+ }
+
+ private boolean tryCorrelateMessage() {
+ try {
+ messageCorrelationBuilder.correlate();
+ logger.info("Message correlation successful");
+ return true;
+ } catch (MismatchingMessageCorrelationException e) {
+ return false;
+ }
+ }
+
+ private void closeClient() {
+ if (client == null)
+ throw new IllegalStateException("Client was not set and could not be closed");
+ try {
+ client.close();
+ } catch (Exception e) {
+ logger.error("Failed closing cds client", e);
+ }
+ }
+ }
+
private class CDSResponse {
String status;
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java
index 2812de799d..019e336325 100644
--- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java
+++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java
@@ -52,4 +52,8 @@ public final class PayloadConstants {
public final static String PNF_UUID = "pnfUuid";
public final static String SERVICE_INSTANCE_ID = "serviceInstanceId";
public final static String MODEL_UUID = "modelUuid";
+
+ public final static String TIMEOUT_CONTROLLER_MESSAGE = "timeoutControllerMessage";
+ public final static String CONTROLLER_ERROR_MESSAGE = "controllerErrorMessage";
+ public final static String CONTROLLER_MSG_TIMEOUT_REACHED = "controllerMessageTimeoutReached";
}
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java
index 2efd74ddc5..ec0eb37b95 100644
--- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java
+++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java
@@ -22,6 +22,7 @@ package org.onap.so.client.restproperties;
import java.net.URL;
import java.util.Objects;
+import org.apache.commons.lang3.StringUtils;
import org.onap.so.bpmn.core.UrnPropertiesReader;
import org.onap.so.client.cds.CDSProperties;
@@ -31,6 +32,8 @@ public class CDSPropertiesImpl implements CDSProperties {
private static final String PORT = "cds.port";
private static final String AUTH = "cds.auth";
private static final String TIMEOUT = "cds.timeout";
+ private static final String KEEP_ALIVE_PING_MINUTES = "keep-alive-ping-minutes";
+ private static final long GRPC_SERVER_DEFAULT_MIN_ALLOWED_PING_INTERVAL = 5;
public CDSPropertiesImpl() {
// Needed for service loader
@@ -90,4 +93,13 @@ public class CDSPropertiesImpl implements CDSProperties {
public boolean getUseBasicAuth() {
return true;
}
+
+ @Override
+ public long getKeepAlivePingMinutes() {
+ String value = UrnPropertiesReader.getVariable(KEEP_ALIVE_PING_MINUTES);
+ if (StringUtils.isBlank(value)) {
+ return GRPC_SERVER_DEFAULT_MIN_ALLOWED_PING_INTERVAL + 1L;
+ }
+ return Long.parseLong(Objects.requireNonNull(value));
+ }
}