summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java132
1 files changed, 77 insertions, 55 deletions
diff --git a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
index 5498b5be31..9741d4b6c2 100644
--- a/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
+++ b/bpmn/MSOCommonBPMN/src/main/java/org/onap/so/client/cds/AbstractCDSProcessingBBUtils.java
@@ -24,7 +24,6 @@ package org.onap.so.client.cds;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers;
import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader;
@@ -51,28 +50,29 @@ import io.grpc.Status;
*
*/
@Component
-public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
+public class AbstractCDSProcessingBBUtils {
private static final Logger logger = LoggerFactory.getLogger(AbstractCDSProcessingBBUtils.class);
private static final String SUCCESS = "Success";
private static final String FAILED = "Failed";
private static final String PROCESSING = "Processing";
+ private static final String RESPONSE_PAYLOAD = "CDSResponsePayload";
+ private static final String CDS_STATUS = "CDSStatus";
+ private static final String EXEC_INPUT = "executionServiceInput";
+
/**
* indicate exception thrown.
*/
private static final String EXCEPTION = "Exception";
-
- private final AtomicReference<String> cdsResponse = new AtomicReference<>();
-
@Autowired
private ExceptionBuilder exceptionUtil;
/**
* Extracting data from execution object and building the ExecutionServiceInput Object
- *
+ *
* @param execution DelegateExecution object
*/
public void constructExecutionServiceInputObject(DelegateExecution execution) {
@@ -105,7 +105,7 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
ExecutionServiceInput.newBuilder().setCommonHeader(commonHeader)
.setActionIdentifiers(actionIdentifiers).setPayload(struct.build()).build();
- execution.setVariable("executionServiceInput", executionServiceInput);
+ execution.setVariable(EXEC_INPUT, executionServiceInput);
} catch (Exception ex) {
exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
@@ -114,7 +114,7 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
/**
* get the executionServiceInput object from execution and send a request to CDS Client and wait for TIMEOUT period
- *
+ *
* @param execution DelegateExecution object
*/
public void sendRequestToCDSClient(DelegateExecution execution) {
@@ -127,10 +127,11 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
"No RestProperty.CDSProperties implementation found on classpath, can't create client.");
}
- ExecutionServiceInput executionServiceInput =
- (ExecutionServiceInput) execution.getVariable("executionServiceInput");
+ ExecutionServiceInput executionServiceInput = (ExecutionServiceInput) execution.getVariable(EXEC_INPUT);
+
+ CDSResponse cdsResponse = new CDSResponse();
- try (CDSProcessingClient cdsClient = new CDSProcessingClient(this)) {
+ try (CDSProcessingClient cdsClient = new CDSProcessingClient(new ResponseHandler(cdsResponse))) {
CountDownLatch countDownLatch = cdsClient.sendRequest(executionServiceInput);
countDownLatch.await(props.getTimeout(), TimeUnit.SECONDS);
} catch (InterruptedException ex) {
@@ -138,61 +139,82 @@ public class AbstractCDSProcessingBBUtils implements CDSProcessingListener {
Thread.currentThread().interrupt();
}
- if (cdsResponse != null) {
- String cdsResponseStatus = cdsResponse.get();
- execution.setVariable("CDSStatus", cdsResponseStatus);
+ String cdsResponseStatus = cdsResponse.status;
+
+ /**
+ * throw CDS failed exception.
+ */
+ if (!cdsResponseStatus.equals(SUCCESS)) {
+ throw new BadResponseException("CDS call failed with status: " + cdsResponse.status
+ + " and errorMessage: " + cdsResponse.errorMessage);
+ }
+
+ execution.setVariable(CDS_STATUS, cdsResponseStatus);
- /**
- * throw CDS failed exception.
- */
- if (cdsResponseStatus != SUCCESS) {
- throw new BadResponseException("CDS call failed with status: " + cdsResponseStatus);
- }
+ if (cdsResponse.payload != null) {
+ String payload = JsonFormat.printer().print(cdsResponse.payload);
+ execution.setVariable(RESPONSE_PAYLOAD, payload);
}
+
+
} catch (Exception ex) {
exceptionUtil.buildAndThrowWorkflowException(execution, 7000, ex);
}
}
- /**
- * Get Response from CDS Client
- *
- */
- @Override
- public void onMessage(ExecutionServiceOutput message) {
- logger.info("Received notification from CDS: {}", message);
- EventType eventType = message.getStatus().getEventType();
-
- switch (eventType) {
-
- case EVENT_COMPONENT_FAILURE:
- // failed processing with failure
- cdsResponse.set(FAILED);
- break;
- case EVENT_COMPONENT_PROCESSING:
- // still processing
- cdsResponse.set(PROCESSING);
- break;
- case EVENT_COMPONENT_EXECUTED:
- // done with async processing
- cdsResponse.set(SUCCESS);
- break;
- default:
- cdsResponse.set(FAILED);
- break;
+ private class ResponseHandler implements CDSProcessingListener {
+
+ private CDSResponse cdsResponse;
+
+ ResponseHandler(CDSResponse cdsResponse) {
+ this.cdsResponse = cdsResponse;
}
- }
+ /**
+ * Get Response from CDS Client
+ */
+ @Override
+ public void onMessage(ExecutionServiceOutput message) {
+ logger.info("Received notification from CDS: {}", message);
+ EventType eventType = message.getStatus().getEventType();
+
+ switch (eventType) {
+ case EVENT_COMPONENT_PROCESSING:
+ cdsResponse.status = PROCESSING;
+ break;
+ case EVENT_COMPONENT_EXECUTED:
+ cdsResponse.status = SUCCESS;
+ break;
+ default:
+ cdsResponse.status = FAILED;
+ cdsResponse.errorMessage = message.getStatus().getErrorMessage();
+ break;
+ }
+ cdsResponse.payload = message.getPayload();
+ }
- /**
- * On error at CDS, log the error
- */
- @Override
- public void onError(Throwable t) {
- Status status = Status.fromThrowable(t);
- logger.error("Failed processing blueprint {}", status, t);
- cdsResponse.set(EXCEPTION);
+ /**
+ * On error at CDS, log the error
+ */
+ @Override
+ public void onError(Throwable t) {
+ Status status = Status.fromThrowable(t);
+ logger.error("Failed processing blueprint {}", status, t);
+ cdsResponse.status = EXCEPTION;
+ }
}
+ private class CDSResponse {
+
+ String status;
+ String errorMessage;
+ Struct payload;
+
+ @Override
+ public String toString() {
+ return "CDSResponse{" + "status='" + status + '\'' + ", errorMessage='" + errorMessage + '\'' + ", payload="
+ + payload + '}';
+ }
+ }
}