diff options
Diffstat (limited to 'bpmn/MSOCommonBPMN/src/main')
-rw-r--r-- | bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java | 132 |
1 files changed, 77 insertions, 55 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java index 5498b5be31..9741d4b6c2 100644 --- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java +++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java @@ -24,7 +24,6 @@ package org.onap.so.client.cds; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.camunda.bpm.engine.delegate.DelegateExecution; import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers; import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader; @@ -51,28 +50,29 @@ import io.grpc.Status; * */ @Component -public class AbstractCDSProcessingBBUtils implements CDSProcessingListener { +public class AbstractCDSProcessingBBUtils { private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class); private static final String SUCCESS = "Success"; private static final String FAILED = "Failed"; private static final String PROCESSING = "Processing"; + private static final String RESPONSE_PAYLOAD = "CDSResponsePayload"; + private static final String CDS_STATUS = "CDSStatus"; + private static final String EXEC_INPUT = "executionServiceInput"; + /** * indicate exception thrown. */ private static final String EXCEPTION = "Exception"; - - private final AtomicReference<String> cdsResponse = new AtomicReference<>(); - @Autowired private ExceptionBuilder exceptionUtil; /** * Extracting data from execution object and building the ExecutionServiceInput Object - * + * * @param execution DelegateExecution object */ public void constructExecutionServiceInputObject(DelegateExecution execution) { @@ -105,7 +105,7 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener { ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader) .setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build(); - execution.setVariable("executionServiceInput", executionServiceInput); + execution.setVariable(EXEC_INPUT, executionServiceInput); } catch (Exception ex) { exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex); @@ -114,7 +114,7 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener { /** * get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period - * + * * @param execution DelegateExecution object */ public void sendRequestToCDSClient(DelegateExecution execution) { @@ -127,10 +127,11 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener { "No RestProperty.CDSProperties implementation found on classpath, can't create client."); } - ExecutionServiceInput executionServiceInput = - (ExecutionServiceInput) execution.getVariable("executionServiceInput"); + ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT); + + CDSResponse cdsResponse = new CDSResponse(); - try (CDSProcessingClient cdsClient = new CDSProcessingClient(this)) { + try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) { CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput); countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS); } catch (InterruptedException ex) { @@ -138,61 +139,82 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener { Thread.currentThread().interrupt(); } - if (cdsResponse != null) { - String cdsResponseStatus = cdsResponse.get(); - execution.setVariable("CDSStatus", cdsResponseStatus); + String cdsResponseStatus = cdsResponse.status; + + /** + * throw CDS failed exception. + */ + if (!cdsResponseStatus.equals(SUCCESS)) { + throw new BadResponseException("CDS call failed with status: " + cdsResponse.status + + " and errorMessage: " + cdsResponse.errorMessage); + } + + execution.setVariable(CDS_STATUS, cdsResponseStatus); - /** - * throw CDS failed exception. - */ - if (cdsResponseStatus != SUCCESS) { - throw new BadResponseException("CDS call failed with status: " + cdsResponseStatus); - } + if (cdsResponse.payload != null) { + String payload = JsonFormat.printer().print(cdsResponse.payload); + execution.setVariable(RESPONSE_PAYLOAD, payload); } + + } catch (Exception ex) { exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex); } } - /** - * Get Response from CDS Client - * - */ - @Override - public void onMessage(ExecutionServiceOutput message) { - logger.info("Received notification from CDS: {}", message); - EventType eventType = message.getStatus().getEventType(); - - switch (eventType) { - - case EVENT_COMPONENT_FAILURE: - // failed processing with failure - cdsResponse.set(FAILED); - break; - case EVENT_COMPONENT_PROCESSING: - // still processing - cdsResponse.set(PROCESSING); - break; - case EVENT_COMPONENT_EXECUTED: - // done with async processing - cdsResponse.set(SUCCESS); - break; - default: - cdsResponse.set(FAILED); - break; + private class ResponseHandler implements CDSProcessingListener { + + private CDSResponse cdsResponse; + + ResponseHandler(CDSResponse cdsResponse) { + this.cdsResponse = cdsResponse; } - } + /** + * Get Response from CDS Client + */ + @Override + public void onMessage(ExecutionServiceOutput message) { + logger.info("Received notification from CDS: {}", message); + EventType eventType = message.getStatus().getEventType(); + + switch (eventType) { + case EVENT_COMPONENT_PROCESSING: + cdsResponse.status = PROCESSING; + break; + case EVENT_COMPONENT_EXECUTED: + cdsResponse.status = SUCCESS; + break; + default: + cdsResponse.status = FAILED; + cdsResponse.errorMessage = message.getStatus().getErrorMessage(); + break; + } + cdsResponse.payload = message.getPayload(); + } - /** - * On error at CDS, log the error - */ - @Override - public void onError(Throwable t) { - Status status = Status.fromThrowable(t); - logger.error("Failed processing blueprint {}", status, t); - cdsResponse.set(EXCEPTION); + /** + * On error at CDS, log the error + */ + @Override + public void onError(Throwable t) { + Status status = Status.fromThrowable(t); + logger.error("Failed processing blueprint {}", status, t); + cdsResponse.status = EXCEPTION; + } } + private class CDSResponse { + + String status; + String errorMessage; + Struct payload; + + @Override + public String toString() { + return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload=" + + payload + '}'; + } + } } |