diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/services/execution-service')
3 files changed, 91 insertions, 35 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt index 07848164e..adb1d67d2 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt @@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming import io.grpc.ManagedChannel -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.FlowPreview -import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.* import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.consumeAsFlow -import kotlinx.coroutines.launch import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc @@ -39,11 +38,13 @@ import org.springframework.stereotype.Service interface StreamingRemoteExecutionService<ReqT, ResT> { - suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT> + suspend fun openSubscription(selector: Any, txId: String): Flow<ResT> - suspend fun send(input: ReqT) + suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT - suspend fun cancelSubscription(requestId: String) + suspend fun send(txId: String, input: ReqT) + + suspend fun cancelSubscription(txId: String) suspend fun closeChannel(selector: Any) } @@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe /** - * Open new channel to send and receive for grpc properties [selector] for [requestId], + * Open new channel to send and receive for grpc properties [selector] for [txId], * Create the only one GRPC channel per host port and reuse for further communication. * Create request communication channel to send and receive requests and responses. - * We can send multiple request with same requestId with unique subRequestId. + * We can send multiple request with same txId. * Consume the flow for responses, * Client should cancel the subscription for the request Id once no longer response is needed. * */ @FlowPreview - override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> { + override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> { - if (!commChannels.containsKey(requestId)) { + if (!commChannels.containsKey(txId)) { /** Get GRPC Channel*/ val grpcChannel = grpcChannel(selector) /** Get Send and Receive Channel for bidirectional process method*/ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel) - commChannels[requestId] = channels + commChannels[txId] = channels } - val commChannel = commChannels[requestId] - ?: throw BluePrintException("failed to create response subscription for request($requestId) channel") + val commChannel = commChannels[txId] + ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel") - log.info("created subscription for request($requestId)") + log.info("created subscription for transactionId($txId)") return commChannel.responseChannel.consumeAsFlow() } /** - * Send the [input]request, by reusing same GRPC channel and Communication channel + * Send the [input] request, by reusing same GRPC channel and Communication channel * for the request Id. */ - override suspend fun send(input: ExecutionServiceInput) { - val requestId = input.commonHeader.requestId - val subRequestId = input.commonHeader.subRequestId - val sendChannel = commChannels[requestId]?.requestChannel - ?: throw BluePrintException("failed to get request($requestId) send channel") + override suspend fun send(txId: String, input: ExecutionServiceInput) { + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") coroutineScope { launch { sendChannel.send(input) - log.trace("Message sent for request($requestId) : subRequest($subRequestId)") + log.trace("Message sent for transactionId($txId)") + } + } + } + + /** + * Simplified version of Streaming calls, Use this API only listing for actual response for [input] + * for the GRPC [selector] with execution [timeOutMill]. + * Other state of the request will be skipped. + * The assumption here is you can call this API with same request Id and unique subrequest Id, + * so the correlation is sub request id to receive the response. + */ + @ExperimentalCoroutinesApi + override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long) + : ExecutionServiceOutput { + + var output: ExecutionServiceOutput? = null + val flow = openSubscription(selector, txId) + + /** Send the request */ + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") + sendChannel.send(input) + + /** Receive the response with timeout */ + withTimeout(timeOutMill) { + flow.collect { + log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + output = it + cancelSubscription(txId) + } } } + return output!! } - /** Cancel the Subscription for the [requestId], This closes communication channel **/ + /** Cancel the Subscription for the [txId], This closes communication channel **/ @ExperimentalCoroutinesApi - override suspend fun cancelSubscription(requestId: String) { - commChannels[requestId]?.let { + override suspend fun cancelSubscription(txId: String) { + commChannels[txId]?.let { if (!it.requestChannel.isClosedForSend) it.requestChannel.close() /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */ //it.responseChannel.cancel(CancellationException("subscription cancelled")) - commChannels.remove(requestId) - log.info("closed subscription for request($requestId)") + commChannels.remove(txId) + log.info("closed subscription for transactionId($txId)") } } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt index c9ff23573..29d24c6ad 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt @@ -36,6 +36,8 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertNotNull class StreamingRemoteExecutionServiceTest { @@ -69,25 +71,48 @@ class StreamingRemoteExecutionServiceTest { val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService) /** To test with real server, uncomment below line */ - coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel + coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel + + /** Test Send and Receive non interactive transaction */ + val nonInteractiveDeferred = arrayListOf<Deferred<*>>() + repeat(2) { count -> + val requestId = "1234-$count" + val request = getRequest(requestId) + val invocationId = request.commonHeader.subRequestId + val deferred = async { + val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties, + invocationId, request, 1000L) + assertNotNull(response, "failed to get non interactive response") + assertEquals(response.commonHeader.requestId, requestId, + "failed to match non interactive response id") + assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED, + "failed to match non interactive response type") + } + nonInteractiveDeferred.add(deferred) - val responseFlowsDeferred = arrayListOf<Deferred<*>>() + } + nonInteractiveDeferred.awaitAll() - repeat(1) { count -> + /** Test Send and Receive interactive transaction */ + val responseFlowsDeferred = arrayListOf<Deferred<*>>() + repeat(2) { count -> val requestId = "12345-$count" - val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId) + val request = getRequest(requestId) + val invocationId = request.commonHeader.requestId + val responseFlow = spyStreamingRemoteExecutionService + .openSubscription(tokenAuthGrpcClientProperties, invocationId) val deferred = async { responseFlow.collect { - log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}") + log.info("Received $count-response ($invocationId) : ${it.status.eventType}") if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { - spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId) + spyStreamingRemoteExecutionService.cancelSubscription(invocationId) } } } responseFlowsDeferred.add(deferred) /** Sending Multiple messages with same requestId and different subRequestId */ - spyStreamingRemoteExecutionService.send(getRequest(requestId)) + spyStreamingRemoteExecutionService.send(invocationId, request) } responseFlowsDeferred.awaitAll() streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties) diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml index 703a52642..afe10b39d 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> |