diff options
author | Brinda Santh <bs2796@att.com> | 2019-10-04 17:24:16 -0400 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2019-10-04 17:24:16 -0400 |
commit | 9b3a9785ae434263371712b3d8e4bffd53f9a510 (patch) | |
tree | c45c34979c7bd73e521cefa19563a011a28cc8ef /ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin | |
parent | edf18b106492a9199e65d96df98232974446c756 (diff) |
Bi-directional GRPC non interactive implementation
Issue-ID: CCSDK-1747
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I236bf6d4edaf983ca4162b5ce064736ac690b504
Diffstat (limited to 'ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin')
-rw-r--r-- | ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt | 85 |
1 files changed, 58 insertions, 27 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt index 07848164e..adb1d67d2 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt @@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming import io.grpc.ManagedChannel -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc @@ -39,11 +38,13 @@ import org.springframework.stereotype.Service interface StreamingRemoteExecutionService<ReqT, ResT> { - suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT> + suspend fun openSubscription(selector: Any, txId: String): Flow<ResT> - suspend fun send(input: ReqT) + suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT - suspend fun cancelSubscription(requestId: String) + suspend fun send(txId: String, input: ReqT) + + suspend fun cancelSubscription(txId: String) suspend fun closeChannel(selector: Any) } @@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe /** - * Open new channel to send and receive for grpc properties [selector] for [requestId], + * Open new channel to send and receive for grpc properties [selector] for [txId], * Create the only one GRPC channel per host port and reuse for further communication. * Create request communication channel to send and receive requests and responses. - * We can send multiple request with same requestId with unique subRequestId. + * We can send multiple request with same txId. * Consume the flow for responses, * Client should cancel the subscription for the request Id once no longer response is needed. * */ @FlowPreview - override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> { + override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> { - if (!commChannels.containsKey(requestId)) { + if (!commChannels.containsKey(txId)) { /** Get GRPC Channel*/ val grpcChannel = grpcChannel(selector) /** Get Send and Receive Channel for bidirectional process method*/ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel) - commChannels[requestId] = channels + commChannels[txId] = channels } - val commChannel = commChannels[requestId] - ?: throw BluePrintException("failed to create response subscription for request($requestId) channel") + val commChannel = commChannels[txId] + ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel") - log.info("created subscription for request($requestId)") + log.info("created subscription for transactionId($txId)") return commChannel.responseChannel.consumeAsFlow() } /** - * Send the [input]request, by reusing same GRPC channel and Communication channel + * Send the [input] request, by reusing same GRPC channel and Communication channel * for the request Id. */ - override suspend fun send(input: ExecutionServiceInput) { - val requestId = input.commonHeader.requestId - val subRequestId = input.commonHeader.subRequestId - val sendChannel = commChannels[requestId]?.requestChannel - ?: throw BluePrintException("failed to get request($requestId) send channel") + override suspend fun send(txId: String, input: ExecutionServiceInput) { + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") coroutineScope { launch { sendChannel.send(input) - log.trace("Message sent for request($requestId) : subRequest($subRequestId)") + log.trace("Message sent for transactionId($txId)") + } + } + } + + /** + * Simplified version of Streaming calls, Use this API only listing for actual response for [input] + * for the GRPC [selector] with execution [timeOutMill]. + * Other state of the request will be skipped. + * The assumption here is you can call this API with same request Id and unique subrequest Id, + * so the correlation is sub request id to receive the response. + */ + @ExperimentalCoroutinesApi + override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long) + : ExecutionServiceOutput { + + var output: ExecutionServiceOutput? = null + val flow = openSubscription(selector, txId) + + /** Send the request */ + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") + sendChannel.send(input) + + /** Receive the response with timeout */ + withTimeout(timeOutMill) { + flow.collect { + log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + output = it + cancelSubscription(txId) + } } } + return output!! } - /** Cancel the Subscription for the [requestId], This closes communication channel **/ + /** Cancel the Subscription for the [txId], This closes communication channel **/ @ExperimentalCoroutinesApi - override suspend fun cancelSubscription(requestId: String) { - commChannels[requestId]?.let { + override suspend fun cancelSubscription(txId: String) { + commChannels[txId]?.let { if (!it.requestChannel.isClosedForSend) it.requestChannel.close() /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */ //it.responseChannel.cancel(CancellationException("subscription cancelled")) - commChannels.remove(requestId) - log.info("closed subscription for request($requestId)") + commChannels.remove(txId) + log.info("closed subscription for transactionId($txId)") } } |