summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt85
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt39
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml2
3 files changed, 91 insertions, 35 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
index 07848164e..adb1d67d2 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode
import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
import io.grpc.ManagedChannel
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
@@ -39,11 +38,13 @@ import org.springframework.stereotype.Service
interface StreamingRemoteExecutionService<ReqT, ResT> {
- suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+ suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
- suspend fun send(input: ReqT)
+ suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
- suspend fun cancelSubscription(requestId: String)
+ suspend fun send(txId: String, input: ReqT)
+
+ suspend fun cancelSubscription(txId: String)
suspend fun closeChannel(selector: Any)
}
@@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
/**
- * Open new channel to send and receive for grpc properties [selector] for [requestId],
+ * Open new channel to send and receive for grpc properties [selector] for [txId],
* Create the only one GRPC channel per host port and reuse for further communication.
* Create request communication channel to send and receive requests and responses.
- * We can send multiple request with same requestId with unique subRequestId.
+ * We can send multiple request with same txId.
* Consume the flow for responses,
* Client should cancel the subscription for the request Id once no longer response is needed.
* */
@FlowPreview
- override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+ override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
- if (!commChannels.containsKey(requestId)) {
+ if (!commChannels.containsKey(txId)) {
/** Get GRPC Channel*/
val grpcChannel = grpcChannel(selector)
/** Get Send and Receive Channel for bidirectional process method*/
val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
- commChannels[requestId] = channels
+ commChannels[txId] = channels
}
- val commChannel = commChannels[requestId]
- ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+ val commChannel = commChannels[txId]
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
- log.info("created subscription for request($requestId)")
+ log.info("created subscription for transactionId($txId)")
return commChannel.responseChannel.consumeAsFlow()
}
/**
- * Send the [input]request, by reusing same GRPC channel and Communication channel
+ * Send the [input] request, by reusing same GRPC channel and Communication channel
* for the request Id.
*/
- override suspend fun send(input: ExecutionServiceInput) {
- val requestId = input.commonHeader.requestId
- val subRequestId = input.commonHeader.subRequestId
- val sendChannel = commChannels[requestId]?.requestChannel
- ?: throw BluePrintException("failed to get request($requestId) send channel")
+ override suspend fun send(txId: String, input: ExecutionServiceInput) {
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
coroutineScope {
launch {
sendChannel.send(input)
- log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+ log.trace("Message sent for transactionId($txId)")
+ }
+ }
+ }
+
+ /**
+ * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+ * for the GRPC [selector] with execution [timeOutMill].
+ * Other state of the request will be skipped.
+ * The assumption here is you can call this API with same request Id and unique subrequest Id,
+ * so the correlation is sub request id to receive the response.
+ */
+ @ExperimentalCoroutinesApi
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+ : ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
}
}
+ return output!!
}
- /** Cancel the Subscription for the [requestId], This closes communication channel **/
+ /** Cancel the Subscription for the [txId], This closes communication channel **/
@ExperimentalCoroutinesApi
- override suspend fun cancelSubscription(requestId: String) {
- commChannels[requestId]?.let {
+ override suspend fun cancelSubscription(txId: String) {
+ commChannels[txId]?.let {
if (!it.requestChannel.isClosedForSend)
it.requestChannel.close()
/** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
//it.responseChannel.cancel(CancellationException("subscription cancelled"))
- commChannels.remove(requestId)
- log.info("closed subscription for request($requestId)")
+ commChannels.remove(txId)
+ log.info("closed subscription for transactionId($txId)")
}
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
index c9ff23573..29d24c6ad 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -36,6 +36,8 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
class StreamingRemoteExecutionServiceTest {
@@ -69,25 +71,48 @@ class StreamingRemoteExecutionServiceTest {
val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
/** To test with real server, uncomment below line */
- coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+ coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ /** Test Send and Receive non interactive transaction */
+ val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "1234-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.subRequestId
+ val deferred = async {
+ val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
+ invocationId, request, 1000L)
+ assertNotNull(response, "failed to get non interactive response")
+ assertEquals(response.commonHeader.requestId, requestId,
+ "failed to match non interactive response id")
+ assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
+ "failed to match non interactive response type")
+ }
+ nonInteractiveDeferred.add(deferred)
- val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ }
+ nonInteractiveDeferred.awaitAll()
- repeat(1) { count ->
+ /** Test Send and Receive interactive transaction */
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
val requestId = "12345-$count"
- val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.requestId
+ val responseFlow = spyStreamingRemoteExecutionService
+ .openSubscription(tokenAuthGrpcClientProperties, invocationId)
val deferred = async {
responseFlow.collect {
- log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
+ log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
- spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
+ spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
}
}
}
responseFlowsDeferred.add(deferred)
/** Sending Multiple messages with same requestId and different subRequestId */
- spyStreamingRemoteExecutionService.send(getRequest(requestId))
+ spyStreamingRemoteExecutionService.send(invocationId, request)
}
responseFlowsDeferred.awaitAll()
streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
index 703a52642..afe10b39d 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>