aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap')
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt85
1 files changed, 58 insertions, 27 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
index 07848164e..adb1d67d2 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode
import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
import io.grpc.ManagedChannel
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
@@ -39,11 +38,13 @@ import org.springframework.stereotype.Service
interface StreamingRemoteExecutionService<ReqT, ResT> {
- suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+ suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
- suspend fun send(input: ReqT)
+ suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
- suspend fun cancelSubscription(requestId: String)
+ suspend fun send(txId: String, input: ReqT)
+
+ suspend fun cancelSubscription(txId: String)
suspend fun closeChannel(selector: Any)
}
@@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
/**
- * Open new channel to send and receive for grpc properties [selector] for [requestId],
+ * Open new channel to send and receive for grpc properties [selector] for [txId],
* Create the only one GRPC channel per host port and reuse for further communication.
* Create request communication channel to send and receive requests and responses.
- * We can send multiple request with same requestId with unique subRequestId.
+ * We can send multiple request with same txId.
* Consume the flow for responses,
* Client should cancel the subscription for the request Id once no longer response is needed.
* */
@FlowPreview
- override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+ override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
- if (!commChannels.containsKey(requestId)) {
+ if (!commChannels.containsKey(txId)) {
/** Get GRPC Channel*/
val grpcChannel = grpcChannel(selector)
/** Get Send and Receive Channel for bidirectional process method*/
val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
- commChannels[requestId] = channels
+ commChannels[txId] = channels
}
- val commChannel = commChannels[requestId]
- ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+ val commChannel = commChannels[txId]
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
- log.info("created subscription for request($requestId)")
+ log.info("created subscription for transactionId($txId)")
return commChannel.responseChannel.consumeAsFlow()
}
/**
- * Send the [input]request, by reusing same GRPC channel and Communication channel
+ * Send the [input] request, by reusing same GRPC channel and Communication channel
* for the request Id.
*/
- override suspend fun send(input: ExecutionServiceInput) {
- val requestId = input.commonHeader.requestId
- val subRequestId = input.commonHeader.subRequestId
- val sendChannel = commChannels[requestId]?.requestChannel
- ?: throw BluePrintException("failed to get request($requestId) send channel")
+ override suspend fun send(txId: String, input: ExecutionServiceInput) {
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
coroutineScope {
launch {
sendChannel.send(input)
- log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+ log.trace("Message sent for transactionId($txId)")
+ }
+ }
+ }
+
+ /**
+ * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+ * for the GRPC [selector] with execution [timeOutMill].
+ * Other state of the request will be skipped.
+ * The assumption here is you can call this API with same request Id and unique subrequest Id,
+ * so the correlation is sub request id to receive the response.
+ */
+ @ExperimentalCoroutinesApi
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+ : ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
}
}
+ return output!!
}
- /** Cancel the Subscription for the [requestId], This closes communication channel **/
+ /** Cancel the Subscription for the [txId], This closes communication channel **/
@ExperimentalCoroutinesApi
- override suspend fun cancelSubscription(requestId: String) {
- commChannels[requestId]?.let {
+ override suspend fun cancelSubscription(txId: String) {
+ commChannels[txId]?.let {
if (!it.requestChannel.isClosedForSend)
it.requestChannel.close()
/** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
//it.responseChannel.cancel(CancellationException("subscription cancelled"))
- commChannels.remove(requestId)
- log.info("closed subscription for request($requestId)")
+ commChannels.remove(txId)
+ log.info("closed subscription for transactionId($txId)")
}
}