diff options
author | Jozsef Csongvai <jozsef.csongvai@bell.ca> | 2022-06-13 08:53:19 -0400 |
---|---|---|
committer | Jozsef Csongvai <jozsef.csongvai@bell.ca> | 2022-06-29 17:40:31 -0400 |
commit | 5baa1ed97c1d2b98952a025c3bc76f60587e9670 (patch) | |
tree | baa4fbf040c19c7ade2cb9feb602dff4906bbe9b | |
parent | 366a173f798422b956625aa83d81fc863e0914a5 (diff) |
Enable long-running processes in ControllerExecutionBB
Instead of blocking a thread while waiting for controller response,
ControllerExecutionBB is now using camunda receive task to support
long running processes without increasing the camunda job timeout.
A new property was added to configure the gRPC client's keep alive
ping mechanism, which will identify connection issues and prevent
the process getting stuck when the controller crashes.
Issue-ID: SO-3953
Signed-off-by: Jozsef Csongvai <jozsef.csongvai@bell.ca>
Change-Id: Iaf6438dba76e715dba846bf45ef47b6a91239c4a
10 files changed, 260 insertions, 39 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java index e5d8a921a5..3ed1011ee9 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java @@ -22,17 +22,24 @@ package org.onap.so.client.cds; +import static org.onap.so.client.cds.PayloadConstants.CONTROLLER_ERROR_MESSAGE; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.Struct; import com.google.protobuf.Struct.Builder; import com.google.protobuf.util.JsonFormat; import io.grpc.Status; +import java.util.UUID; +import org.apache.commons.lang3.StringUtils; +import org.camunda.bpm.engine.MismatchingMessageCorrelationException; +import org.camunda.bpm.engine.ProcessEngine; import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.runtime.MessageCorrelationBuilder; import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput; +import org.onap.logging.filter.base.ONAPComponents; import org.onap.so.bpmn.common.BuildingBlockExecution; import org.onap.so.client.PreconditionFailedException; import org.onap.so.client.RestPropertiesLoader; @@ -62,10 +69,17 @@ public class AbstractCDSProcessingBBUtils { private static final String EXEC_INPUT = "executionServiceInput"; private static final String EXECUTION_OBJECT = "executionObject"; private static final String EXCEPTION = "Exception"; + private static final String CDS_REQUEST_ID = "CDS_REQUEST_ID"; + private static final String CONTROLLER_MESSAGE = "ControllerMessage"; + + private static final String REQ_ID = "requestId"; @Autowired protected ExceptionBuilder exceptionUtil; + @Autowired + private ProcessEngine processEngine; + /** * Extracting data from execution object and building the ExecutionServiceInput Object * @@ -132,23 +146,28 @@ public class AbstractCDSProcessingBBUtils { } /** - * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period + * get the executionServiceInput object from execution and send a request to CDS Client * * @param execution BuildingBlockExecution object */ public void sendRequestToCDSClientBB(BuildingBlockExecution execution) { - logger.trace("Start AbstractCDSProcessingBBUtils.sendRequestToCDSClient for BuildingBlockExecution object."); try { ExecutionServiceInput executionServiceInput = execution.getVariable(EXEC_INPUT); - CDSResponse cdsResponse = getCdsResponse(executionServiceInput); - execution.setVariable(CDS_STATUS, cdsResponse.status); - if (cdsResponse.payload != null) { - String payload = JsonFormat.printer().print(cdsResponse.payload); - execution.setVariable(RESPONSE_PAYLOAD, payload); + String messageCorrelationId = executionServiceInput.getCommonHeader().getSubRequestId(); + if (StringUtils.isBlank(messageCorrelationId)) { + throw new IllegalArgumentException("subRequestId can not be blank"); } - + execution.setVariable(CDS_REQUEST_ID, messageCorrelationId); + + MessageCorrelationBuilder messageCorrelationBuilder = + processEngine.getRuntimeService().createMessageCorrelation(CONTROLLER_MESSAGE) + .processInstanceVariableEquals(CDS_REQUEST_ID, messageCorrelationId); + MessageSendingHandler handler = new MessageSendingHandler(messageCorrelationBuilder); + CDSProcessingClient client = new CDSProcessingClient(handler); + handler.setClient(client); + client.sendRequest(executionServiceInput); } catch (Exception ex) { exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex); } @@ -249,6 +268,93 @@ public class AbstractCDSProcessingBBUtils { } } + private class MessageSendingHandler implements CDSProcessingListener { + + private MessageCorrelationBuilder messageCorrelationBuilder; + private AutoCloseable client; + private Logger logger = LoggerFactory.getLogger(MessageSendingHandler.class); + + MessageSendingHandler(MessageCorrelationBuilder messageCorrelationBuilder) { + this.messageCorrelationBuilder = messageCorrelationBuilder; + } + + public void setClient(AutoCloseable client) { + this.client = client; + } + + @Override + public void onMessage(ExecutionServiceOutput message) { + logger.info("Received payload from CDS: {}", message); + EventType eventType = message.getStatus().getEventType(); + + if (eventType == EventType.EVENT_COMPONENT_PROCESSING) { + return; + } + + String status = eventType == EventType.EVENT_COMPONENT_EXECUTED ? SUCCESS : FAILED; + messageCorrelationBuilder.setVariable(CDS_STATUS, status); + messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, message.getStatus().getErrorMessage()); + + if (message.hasPayload()) { + try { + String payload = JsonFormat.printer().print(message.getPayload()); + messageCorrelationBuilder.setVariable(RESPONSE_PAYLOAD, payload); + } catch (InvalidProtocolBufferException e) { + logger.error("Failed parsing cds response", e); + } + } + correlate(); + } + + @Override + public void onError(Throwable t) { + logger.error("Failed sending CDS request", t); + messageCorrelationBuilder.setVariable(CONTROLLER_ERROR_MESSAGE, t.getMessage()); + messageCorrelationBuilder.setVariable(CDS_STATUS, FAILED); + correlate(); + } + + /** + * When a CDS call returns before the bpmn process is in a waiting state, message correlation will fail. This + * retry logic will allow camunda some time to finish transitioning the process. + */ + private void correlate() { + try { + int remainingTries = 10; + while (!tryCorrelateMessage() && remainingTries > 0) { + logger.warn("Message correlation failed. Retries remaining: {}", remainingTries); + remainingTries--; + Thread.sleep(1000L); + } + } catch (InterruptedException e) { + logger.error("Thread interrupted during message correlation", e); + Thread.currentThread().interrupt(); + } finally { + closeClient(); + } + } + + private boolean tryCorrelateMessage() { + try { + messageCorrelationBuilder.correlate(); + logger.info("Message correlation successful"); + return true; + } catch (MismatchingMessageCorrelationException e) { + return false; + } + } + + private void closeClient() { + if (client == null) + throw new IllegalStateException("Client was not set and could not be closed"); + try { + client.close(); + } catch (Exception e) { + logger.error("Failed closing cds client", e); + } + } + } + private class CDSResponse { String status; diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java index 2812de799d..019e336325 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/PayloadConstants.java @@ -52,4 +52,8 @@ public final class PayloadConstants { public final static String PNF_UUID = "pnfUuid"; public final static String SERVICE_INSTANCE_ID = "serviceInstanceId"; public final static String MODEL_UUID = "modelUuid"; + + public final static String TIMEOUT_CONTROLLER_MESSAGE = "timeoutControllerMessage"; + public final static String CONTROLLER_ERROR_MESSAGE = "controllerErrorMessage"; + public final static String CONTROLLER_MSG_TIMEOUT_REACHED = "controllerMessageTimeoutReached"; } diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java index 2efd74ddc5..ec0eb37b95 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/restproperties/CDSPropertiesImpl.java @@ -22,6 +22,7 @@ package org.onap.so.client.restproperties; import java.net.URL; import java.util.Objects; +import org.apache.commons.lang3.StringUtils; import org.onap.so.bpmn.core.UrnPropertiesReader; import org.onap.so.client.cds.CDSProperties; @@ -31,6 +32,8 @@ public class CDSPropertiesImpl implements CDSProperties { private static final String PORT = "cds.port"; private static final String AUTH = "cds.auth"; private static final String TIMEOUT = "cds.timeout"; + private static final String KEEP_ALIVE_PING_MINUTES = "keep-alive-ping-minutes"; + private static final long GRPC_SERVER_DEFAULT_MIN_ALLOWED_PING_INTERVAL = 5; public CDSPropertiesImpl() { // Needed for service loader @@ -90,4 +93,13 @@ public class CDSPropertiesImpl implements CDSProperties { public boolean getUseBasicAuth() { return true; } + + @Override + public long getKeepAlivePingMinutes() { + String value = UrnPropertiesReader.getVariable(KEEP_ALIVE_PING_MINUTES); + if (StringUtils.isBlank(value)) { + return GRPC_SERVER_DEFAULT_MIN_ALLOWED_PING_INTERVAL + 1L; + } + return Long.parseLong(Objects.requireNonNull(value)); + } } diff --git a/bpmn/so-bpmn-building-blocks/src/main/resources/subprocess/BuildingBlock/ControllerExecutionBB.bpmn b/bpmn/so-bpmn-building-blocks/src/main/resources/subprocess/BuildingBlock/ControllerExecutionBB.bpmn index 065d7e0c4b..e04d281c8b 100644 --- a/bpmn/so-bpmn-building-blocks/src/main/resources/subprocess/BuildingBlock/ControllerExecutionBB.bpmn +++ b/bpmn/so-bpmn-building-blocks/src/main/resources/subprocess/BuildingBlock/ControllerExecutionBB.bpmn @@ -1,5 +1,5 @@ <?xml version="1.0" encoding="UTF-8"?> -<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_1ahlzqg" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.5.0"> +<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:camunda="http://camunda.org/schema/1.0/bpmn" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" id="Definitions_1ahlzqg" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.6.0"> <bpmn:process id="ControllerExecutionBB" name="ControllerExecutionBB" isExecutable="true"> <bpmn:startEvent id="StartEvent_1"> <bpmn:outgoing>SequenceFlow_0gmfit3</bpmn:outgoing> @@ -14,24 +14,19 @@ <bpmn:incoming>SequenceFlow_07tqu82</bpmn:incoming> <bpmn:outgoing>SequenceFlow_1mkhog2</bpmn:outgoing> </bpmn:serviceTask> - <bpmn:sequenceFlow id="SequenceFlow_05qembo" sourceRef="Task_0bhf6tp" targetRef="ExclusiveGateway_13q340y" /> + <bpmn:sequenceFlow id="SequenceFlow_05qembo" sourceRef="Task_0bhf6tp" targetRef="ReceiveTask_0gwz54h" /> <bpmn:serviceTask id="Task_0bhf6tp" name="Call ControllerExecutionBB" camunda:expression="${ControllerExecutionBB.execute(InjectExecution.execute(execution, execution.getVariable("gBuildingBlockExecution")))}"> <bpmn:incoming>SequenceFlow_0vzx2yr</bpmn:incoming> <bpmn:outgoing>SequenceFlow_05qembo</bpmn:outgoing> </bpmn:serviceTask> - <bpmn:exclusiveGateway id="ExclusiveGateway_13q340y" default="SequenceFlow_15gxql1"> - <bpmn:incoming>SequenceFlow_05qembo</bpmn:incoming> + <bpmn:exclusiveGateway id="ExclusiveGateway_13q340y" default="SequenceFlow_1szkurj"> + <bpmn:incoming>SequenceFlow_01kp408</bpmn:incoming> <bpmn:outgoing>SequenceFlow_07tqu82</bpmn:outgoing> - <bpmn:outgoing>SequenceFlow_15gxql1</bpmn:outgoing> + <bpmn:outgoing>SequenceFlow_1szkurj</bpmn:outgoing> </bpmn:exclusiveGateway> <bpmn:sequenceFlow id="SequenceFlow_07tqu82" name="successCDS" sourceRef="ExclusiveGateway_13q340y" targetRef="Task_1hs1mn0"> <bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">#{execution.getVariable("ControllerStatus").equals("Success")}</bpmn:conditionExpression> </bpmn:sequenceFlow> - <bpmn:endEvent id="EndEvent_0mnaj50"> - <bpmn:incoming>SequenceFlow_15gxql1</bpmn:incoming> - <bpmn:errorEventDefinition id="ErrorEventDefinition_1s1hqgm" errorRef="Error_0aovtfv" /> - </bpmn:endEvent> - <bpmn:sequenceFlow id="SequenceFlow_15gxql1" sourceRef="ExclusiveGateway_13q340y" targetRef="EndEvent_0mnaj50" /> <bpmn:serviceTask id="ServiceTask_0inxg9l" name="Set Actor, Scope and Action Params" camunda:expression="${ControllerExecution.setControllerActorScopeAction(InjectExecution.execute(execution, execution.getVariable("gBuildingBlockExecution")))}"> <bpmn:incoming>SequenceFlow_0gmfit3</bpmn:incoming> <bpmn:outgoing>SequenceFlow_1lspfyy</bpmn:outgoing> @@ -79,8 +74,29 @@ <bpmn:sequenceFlow id="Flow_0qmjpxv" name="success" sourceRef="Gateway_065nxpu" targetRef="EndEvent_0lgvk82"> <bpmn:conditionExpression xsi:type="bpmn:tFormalExpression">#{execution.getVariable("ControllerStatus").equals("Success")}</bpmn:conditionExpression> </bpmn:sequenceFlow> + <bpmn:receiveTask id="ReceiveTask_0gwz54h" name="Wait for Controller Message" camunda:asyncAfter="true" messageRef="Message_01gofle"> + <bpmn:incoming>SequenceFlow_05qembo</bpmn:incoming> + <bpmn:outgoing>SequenceFlow_01kp408</bpmn:outgoing> + </bpmn:receiveTask> + <bpmn:boundaryEvent id="BoundaryEvent_0nkal4w" name="Timeout" attachedToRef="ReceiveTask_0gwz54h"> + <bpmn:extensionElements> + <camunda:executionListener expression="#{execution.setVariable("controllerMessageTimeoutReached", true)}" event="start" /> + </bpmn:extensionElements> + <bpmn:outgoing>SequenceFlow_13ddk47</bpmn:outgoing> + <bpmn:timerEventDefinition id="TimerEventDefinition_14lr1mm"> + <bpmn:timeDuration xsi:type="bpmn:tFormalExpression">#{execution.getVariable("timeoutControllerMessage")}</bpmn:timeDuration> + </bpmn:timerEventDefinition> + </bpmn:boundaryEvent> + <bpmn:serviceTask id="ServiceTask_0t0bo1j" name="Handle Failure" camunda:expression="${ControllerExecutionBB.handleFailure(InjectExecution.execute(execution, execution.getVariable("gBuildingBlockExecution")))}"> + <bpmn:incoming>SequenceFlow_1szkurj</bpmn:incoming> + <bpmn:incoming>SequenceFlow_13ddk47</bpmn:incoming> + </bpmn:serviceTask> + <bpmn:sequenceFlow id="SequenceFlow_1szkurj" sourceRef="ExclusiveGateway_13q340y" targetRef="ServiceTask_0t0bo1j" /> + <bpmn:sequenceFlow id="SequenceFlow_13ddk47" sourceRef="BoundaryEvent_0nkal4w" targetRef="ServiceTask_0t0bo1j" /> + <bpmn:sequenceFlow id="SequenceFlow_01kp408" sourceRef="ReceiveTask_0gwz54h" targetRef="ExclusiveGateway_13q340y" /> </bpmn:process> <bpmn:error id="Error_0aovtfv" name="MSOWorkflowException" errorCode="MSOWorkflowException" /> + <bpmn:message id="Message_01gofle" name="ControllerMessage" /> <bpmndi:BPMNDiagram id="BPMNDiagram_1"> <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="ControllerExecutionBB"> <bpmndi:BPMNEdge id="Flow_0qmjpxv_di" bpmnElement="Flow_0qmjpxv"> @@ -93,9 +109,9 @@ <bpmndi:BPMNEdge id="SequenceFlow_0vzx2yr_di" bpmnElement="SequenceFlow_0vzx2yr"> <di:waypoint x="424" y="366" /> <di:waypoint x="424" y="462" /> - <di:waypoint x="572" y="462" /> + <di:waypoint x="520" y="462" /> <bpmndi:BPMNLabel> - <dc:Bounds x="455" y="436" width="60" height="12" /> + <dc:Bounds x="434" y="436" width="59" height="14" /> </bpmndi:BPMNLabel> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_0op5irz_di" bpmnElement="SequenceFlow_0op5irz"> @@ -134,13 +150,6 @@ <dc:Bounds x="229" y="240" width="0" height="12" /> </bpmndi:BPMNLabel> </bpmndi:BPMNEdge> - <bpmndi:BPMNEdge id="SequenceFlow_15gxql1_di" bpmnElement="SequenceFlow_15gxql1"> - <di:waypoint x="893" y="487" /> - <di:waypoint x="893" y="565" /> - <bpmndi:BPMNLabel> - <dc:Bounds x="713" y="436" width="90" height="20" /> - </bpmndi:BPMNLabel> - </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_07tqu82_di" bpmnElement="SequenceFlow_07tqu82"> <di:waypoint x="918" y="462" /> <di:waypoint x="979" y="462" /> @@ -149,8 +158,8 @@ </bpmndi:BPMNLabel> </bpmndi:BPMNEdge> <bpmndi:BPMNEdge id="SequenceFlow_05qembo_di" bpmnElement="SequenceFlow_05qembo"> - <di:waypoint x="672" y="462" /> - <di:waypoint x="868" y="462" /> + <di:waypoint x="620" y="462" /> + <di:waypoint x="690" y="462" /> <bpmndi:BPMNLabel> <dc:Bounds x="725" y="437" width="90" height="20" /> </bpmndi:BPMNLabel> @@ -186,7 +195,7 @@ <dc:Bounds x="979" y="422" width="100" height="80" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape id="ServiceTask_01mv1si_di" bpmnElement="Task_0bhf6tp"> - <dc:Bounds x="572" y="422" width="100" height="80" /> + <dc:Bounds x="520" y="422" width="100" height="80" /> </bpmndi:BPMNShape> <bpmndi:BPMNShape id="ExclusiveGateway_13q340y_di" bpmnElement="ExclusiveGateway_13q340y" isMarkerVisible="true"> <dc:Bounds x="868" y="437" width="50" height="50" /> @@ -194,12 +203,6 @@ <dc:Bounds x="698" y="327" width="90" height="20" /> </bpmndi:BPMNLabel> </bpmndi:BPMNShape> - <bpmndi:BPMNShape id="EndEvent_0mnaj50_di" bpmnElement="EndEvent_0mnaj50"> - <dc:Bounds x="875" y="565" width="36" height="36" /> - <bpmndi:BPMNLabel> - <dc:Bounds x="698" y="531" width="90" height="20" /> - </bpmndi:BPMNLabel> - </bpmndi:BPMNShape> <bpmndi:BPMNShape id="ServiceTask_0inxg9l_di" bpmnElement="ServiceTask_0inxg9l"> <dc:Bounds x="259" y="301" width="100" height="80" /> </bpmndi:BPMNShape> @@ -224,6 +227,31 @@ <bpmndi:BPMNShape id="Gateway_065nxpu_di" bpmnElement="Gateway_065nxpu" isMarkerVisible="true"> <dc:Bounds x="1122" y="183" width="50" height="50" /> </bpmndi:BPMNShape> + <bpmndi:BPMNShape id="ReceiveTask_0gwz54h_di" bpmnElement="ReceiveTask_0gwz54h"> + <dc:Bounds x="690" y="422" width="100" height="80" /> + </bpmndi:BPMNShape> + <bpmndi:BPMNShape id="BoundaryEvent_0nkal4w_di" bpmnElement="BoundaryEvent_0nkal4w"> + <dc:Bounds x="722" y="484" width="36" height="36" /> + <bpmndi:BPMNLabel> + <dc:Bounds x="750" y="523" width="40" height="14" /> + </bpmndi:BPMNLabel> + </bpmndi:BPMNShape> + <bpmndi:BPMNShape id="ServiceTask_0t0bo1j_di" bpmnElement="ServiceTask_0t0bo1j"> + <dc:Bounds x="843" y="580" width="100" height="80" /> + </bpmndi:BPMNShape> + <bpmndi:BPMNEdge id="SequenceFlow_1szkurj_di" bpmnElement="SequenceFlow_1szkurj"> + <di:waypoint x="893" y="487" /> + <di:waypoint x="893" y="580" /> + </bpmndi:BPMNEdge> + <bpmndi:BPMNEdge id="SequenceFlow_13ddk47_di" bpmnElement="SequenceFlow_13ddk47"> + <di:waypoint x="740" y="520" /> + <di:waypoint x="740" y="620" /> + <di:waypoint x="843" y="620" /> + </bpmndi:BPMNEdge> + <bpmndi:BPMNEdge id="SequenceFlow_01kp408_di" bpmnElement="SequenceFlow_01kp408"> + <di:waypoint x="790" y="462" /> + <di:waypoint x="868" y="462" /> + </bpmndi:BPMNEdge> </bpmndi:BPMNPlane> </bpmndi:BPMNDiagram> </bpmn:definitions> diff --git a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBB.java b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBB.java index 92be824691..c5536106fe 100644 --- a/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBB.java +++ b/bpmn/so-bpmn-tasks/src/main/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBB.java @@ -26,11 +26,13 @@ import org.onap.so.bpmn.common.BuildingBlockExecution; import org.onap.so.bpmn.infrastructure.decisionpoint.api.ControllerContext; import org.onap.so.bpmn.infrastructure.decisionpoint.api.ControllerRunnable; import org.onap.so.bpmn.infrastructure.decisionpoint.impl.AbstractControllerExecution; +import org.onap.so.client.cds.PayloadConstants; import org.onap.so.db.catalog.beans.ControllerSelectionReference; import org.onap.so.db.catalog.beans.PnfResourceCustomization; import org.onap.so.db.catalog.beans.VnfResourceCustomization; import org.onap.so.db.catalog.client.CatalogDbClient; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; /** @@ -52,8 +54,11 @@ public class ControllerExecutionBB extends AbstractControllerExecution<BuildingB @Autowired protected CatalogDbClient catalogDbClient; + @Value("${controller-execution.timeout-for-controller-message:P1D}") + private String timeoutForControllerMessage; public void execute(final BuildingBlockExecution execution) { + execution.setVariable(PayloadConstants.TIMEOUT_CONTROLLER_MESSAGE, timeoutForControllerMessage); ControllerContext<BuildingBlockExecution> controllerContext = buildControllerContext(execution); controllerExecute(controllerContext); } @@ -151,4 +156,18 @@ public class ControllerExecutionBB extends AbstractControllerExecution<BuildingB "Unable to find the controller implementation", ONAPComponents.SO); } } + + public void handleFailure(final BuildingBlockExecution execution) { + String errorMessage = execution.getVariable(PayloadConstants.CONTROLLER_ERROR_MESSAGE); + + if (Boolean.TRUE.equals(execution.getVariable(PayloadConstants.CONTROLLER_MSG_TIMEOUT_REACHED))) { + logger.error( + "timeout-for-controller-message was reached. If the controller is still processing, this property should be reconfigured"); + errorMessage = "Controller response was not received within configured timeout"; + } else if (errorMessage == null) { + errorMessage = "Controller call failed. No errormessage was captured."; + } + + exceptionBuilder.buildAndThrowWorkflowException(execution, 9003, errorMessage, ONAPComponents.SO); + } } diff --git a/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBBTest.java b/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBBTest.java index 0f9b4d9012..abc2cc4f76 100644 --- a/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBBTest.java +++ b/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/buildingblock/ControllerExecutionBBTest.java @@ -20,6 +20,10 @@ package org.onap.so.bpmn.infrastructure.decisionpoint.impl.buildingblock; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.onap.so.bpmn.infrastructure.decisionpoint.impl.buildingblock.MockControllerBB.TEST_ACTION; @@ -27,7 +31,10 @@ import static org.onap.so.bpmn.infrastructure.decisionpoint.impl.buildingblock.M import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.onap.logging.filter.base.ONAPComponents; import org.onap.so.bpmn.common.BuildingBlockExecution; +import org.onap.so.client.cds.PayloadConstants; import org.onap.so.client.exception.ExceptionBuilder; import org.onap.so.db.catalog.beans.ControllerSelectionReference; import org.onap.so.db.catalog.beans.PnfResourceCustomization; @@ -70,6 +77,9 @@ public class ControllerExecutionBBTest { @MockBean private ControllerSelectionReference controllerSelectionReference; + @MockBean + ExceptionBuilder exceptionBuilder; + @Before public void setUp() { when(execution.getVariable(ACTOR_PARAM)).thenReturn(TEST_ACTOR); @@ -157,4 +167,26 @@ public class ControllerExecutionBBTest { expectedVnfControllerActor, controllerActor); } } + + @Test + public void testHandleFailure() { + when(execution.getVariable(PayloadConstants.CONTROLLER_ERROR_MESSAGE)).thenReturn("ERROR MESSAGE"); + + controllerExecutionBB.handleFailure(execution); + + verify(exceptionBuilder).buildAndThrowWorkflowException(execution, 9003, "ERROR MESSAGE", ONAPComponents.SO); + } + + @Test + public void testHandleTimeoutFailure() { + when(execution.getVariable(PayloadConstants.CONTROLLER_MSG_TIMEOUT_REACHED)).thenReturn(true); + + controllerExecutionBB.handleFailure(execution); + + ArgumentCaptor<String> errMsgCaptor = ArgumentCaptor.forClass(String.class); + verify(exceptionBuilder, times(1)).buildAndThrowWorkflowException(any(BuildingBlockExecution.class), anyInt(), + errMsgCaptor.capture(), any()); + + assertTrue(errMsgCaptor.getValue().contains("timeout")); + } } diff --git a/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/camunda/controller/cds/PnfConfigCdsControllerDETest.java b/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/camunda/controller/cds/PnfConfigCdsControllerDETest.java index d8f607f6d9..3c3dc839c8 100644 --- a/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/camunda/controller/cds/PnfConfigCdsControllerDETest.java +++ b/bpmn/so-bpmn-tasks/src/test/java/org/onap/so/bpmn/infrastructure/decisionpoint/impl/camunda/controller/cds/PnfConfigCdsControllerDETest.java @@ -50,7 +50,7 @@ public class PnfConfigCdsControllerDETest { @MockBean private ControllerPreparable<DelegateExecution> preparable; - @Mock + @MockBean private AbstractCDSProcessingBBUtils abstractCDSProcessingBBUtils; @Test diff --git a/common/src/main/java/org/onap/so/client/cds/CDSProcessingClient.java b/common/src/main/java/org/onap/so/client/cds/CDSProcessingClient.java index e40b936daa..6e27b85863 100644 --- a/common/src/main/java/org/onap/so/client/cds/CDSProcessingClient.java +++ b/common/src/main/java/org/onap/so/client/cds/CDSProcessingClient.java @@ -28,6 +28,7 @@ import java.security.KeyStore; import java.security.KeyStoreException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import javax.net.ssl.SSLException; import javax.net.ssl.TrustManagerFactory; import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput; @@ -107,6 +108,7 @@ public class CDSProcessingClient implements AutoCloseable { log.info("Configure Basic authentication"); builder.intercept(new BasicAuthClientInterceptor(props)).usePlaintext(); } + builder.keepAliveTime(props.getKeepAlivePingMinutes(), TimeUnit.MINUTES); this.channel = builder.build(); this.handler = new CDSProcessingHandler(listener); log.info("CDSProcessingClient started"); diff --git a/common/src/main/java/org/onap/so/client/cds/CDSProperties.java b/common/src/main/java/org/onap/so/client/cds/CDSProperties.java index db566fa3de..f47a70976b 100644 --- a/common/src/main/java/org/onap/so/client/cds/CDSProperties.java +++ b/common/src/main/java/org/onap/so/client/cds/CDSProperties.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,7 @@ package org.onap.so.client.cds; +import java.util.concurrent.TimeUnit; import org.onap.so.client.RestProperties; public interface CDSProperties extends RestProperties { @@ -35,4 +36,16 @@ public interface CDSProperties extends RestProperties { boolean getUseSSL(); boolean getUseBasicAuth(); + + /** + * Gets grpc keep alive ping interval, which is useful for detecting connection issues when the server dies + * abruptly. If the value is set lower than what is allowed by the server (default 5 min), the connection will be + * closed after a few pings. + * + * If no value is set this method will default to 6 min (server default minimum + 1) + * + * @see io.grpc.netty.NettyChannelBuilder#keepAliveTime(long, TimeUnit) + * @return + */ + long getKeepAlivePingMinutes(); } diff --git a/common/src/test/java/org/onap/so/client/cds/TestCDSPropertiesImpl.java b/common/src/test/java/org/onap/so/client/cds/TestCDSPropertiesImpl.java index 41238e539e..76dc6ad4ea 100644 --- a/common/src/test/java/org/onap/so/client/cds/TestCDSPropertiesImpl.java +++ b/common/src/test/java/org/onap/so/client/cds/TestCDSPropertiesImpl.java @@ -82,4 +82,9 @@ public class TestCDSPropertiesImpl implements CDSProperties { public boolean getUseBasicAuth() { return true; } + + @Override + public long getKeepAlivePingMinutes() { + return 6L; + } } |