summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt5
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/pom.xml2
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/pom.xml5
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt205
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt97
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt144
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml2
10 files changed, 463 insertions, 5 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
index 088533a71..a1d2188ab 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
@@ -73,7 +73,7 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
}
}
- private fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
+ fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
BluePrintGrpcClientService {
when (grpcClientProperties) {
is TokenAuthGrpcClientProperties -> {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index ab04054fe..1cd8a2af7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
var clientId: String? = null
var topic: String? = null
var pollMillSec: Long = 1000
+ var pollRecords: Int = -1
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 5a9e61bfd..b5d444a49 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
+ /** To handle Back pressure, Get only configured record for processing */
+ if (messageConsumerProperties.pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+ }
// TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.info("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 2b84eaa78..f4e85a94b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -52,6 +52,7 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
"blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
@@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest {
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
- delay(1000)
+ delay(100)
blueprintMessageProducerService.sendMessage("this is my message($it)")
}
}
- delay(10000)
+ delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/pom.xml b/ms/blueprintsprocessor/modules/inbounds/pom.xml
index 1729f2ff1..81ffea176 100644
--- a/ms/blueprintsprocessor/modules/inbounds/pom.xml
+++ b/ms/blueprintsprocessor/modules/inbounds/pom.xml
@@ -35,7 +35,7 @@
<module>designer-api</module>
<module>resource-api</module>
<module>selfservice-api</module>
- <module>health-api</module>
+ <!--<module>health-api</module>-->
</modules>
<dependencies>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
index 8bee7c91c..f3044c8bb 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
@@ -64,5 +64,10 @@
<groupId>org.onap.ccsdk.sli.core</groupId>
<artifactId>sli-provider</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
new file mode 100644
index 000000000..adb1d67d2
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -0,0 +1,205 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
+import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
+import io.grpc.ManagedChannel
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+
+interface StreamingRemoteExecutionService<ReqT, ResT> {
+
+ suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
+
+ suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
+
+ suspend fun send(txId: String, input: ReqT)
+
+ suspend fun cancelSubscription(txId: String)
+
+ suspend fun closeChannel(selector: Any)
+}
+
+@Service
+@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
+ : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+
+ private val log = logger(StreamingRemoteExecutionServiceImpl::class)
+
+ private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
+
+ private val commChannels: MutableMap<String,
+ ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
+
+
+ /**
+ * Open new channel to send and receive for grpc properties [selector] for [txId],
+ * Create the only one GRPC channel per host port and reuse for further communication.
+ * Create request communication channel to send and receive requests and responses.
+ * We can send multiple request with same txId.
+ * Consume the flow for responses,
+ * Client should cancel the subscription for the request Id once no longer response is needed.
+ * */
+ @FlowPreview
+ override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
+
+ if (!commChannels.containsKey(txId)) {
+ /** Get GRPC Channel*/
+ val grpcChannel = grpcChannel(selector)
+
+ /** Get Send and Receive Channel for bidirectional process method*/
+ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+ commChannels[txId] = channels
+ }
+
+ val commChannel = commChannels[txId]
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
+
+ log.info("created subscription for transactionId($txId)")
+
+ return commChannel.responseChannel.consumeAsFlow()
+ }
+
+ /**
+ * Send the [input] request, by reusing same GRPC channel and Communication channel
+ * for the request Id.
+ */
+ override suspend fun send(txId: String, input: ExecutionServiceInput) {
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ coroutineScope {
+ launch {
+ sendChannel.send(input)
+ log.trace("Message sent for transactionId($txId)")
+ }
+ }
+ }
+
+ /**
+ * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+ * for the GRPC [selector] with execution [timeOutMill].
+ * Other state of the request will be skipped.
+ * The assumption here is you can call this API with same request Id and unique subrequest Id,
+ * so the correlation is sub request id to receive the response.
+ */
+ @ExperimentalCoroutinesApi
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+ : ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
+ }
+ }
+ return output!!
+ }
+
+ /** Cancel the Subscription for the [txId], This closes communication channel **/
+ @ExperimentalCoroutinesApi
+ override suspend fun cancelSubscription(txId: String) {
+ commChannels[txId]?.let {
+ if (!it.requestChannel.isClosedForSend)
+ it.requestChannel.close()
+ /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
+ //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ commChannels.remove(txId)
+ log.info("closed subscription for transactionId($txId)")
+ }
+ }
+
+ /** Close the GRPC channel for the host port poperties [selector]*/
+ override suspend fun closeChannel(selector: Any) {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ if (grpcChannels.containsKey(selectorName)) {
+ grpcChannels[selectorName]!!.shutdownNow()
+ grpcChannels.remove(selectorName)
+ log.info("grpc channel($selectorName) shutdown completed")
+ }
+ }
+
+ /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
+ private suspend fun grpcChannel(selector: Any): ManagedChannel {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
+ val grpcChannel = if (isGrpcChannelCached) {
+ if (grpcChannels[selectorName]!!.isShutdown) {
+ createGrpcChannel(grpcProperties)
+ } else {
+ grpcChannels[selectorName]!!
+ }
+ } else {
+ createGrpcChannel(grpcProperties)
+ }
+ grpcChannels[selectorName] = grpcChannel
+ return grpcChannel
+ }
+
+ suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
+ val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
+ .blueprintGrpcClientService(grpcProperties)
+ return grpcClientService.channel()
+ }
+
+ private fun grpcProperties(selector: Any): GrpcClientProperties {
+ return when (selector) {
+ is String -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
+ }
+ is JsonNode -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
+ }
+ is GrpcClientProperties -> {
+ selector
+ }
+ else -> {
+ throw BluePrintException("couldn't process selector($selector)")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
new file mode 100644
index 000000000..e291aa78e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
+
+import io.grpc.ServerBuilder
+import io.grpc.stub.StreamObserver
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+
+private val log = logger(MockBluePrintProcessingServer::class)
+
+
+class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
+
+ override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
+
+ return object : StreamObserver<ExecutionServiceInput> {
+ override fun onNext(executionServiceInput: ExecutionServiceInput) {
+ log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
+ "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
+ responseObserver.onNext(buildNotification(executionServiceInput))
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ responseObserver.onCompleted()
+ }
+
+ override fun onError(error: Throwable) {
+ log.debug("Fail to process message", error)
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription(error.message)
+ .asException())
+ }
+
+ override fun onCompleted() {
+ log.info("Completed")
+ }
+ }
+ }
+
+
+ private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput {
+ val status = Status.newBuilder()
+ .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+ }
+
+ private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput {
+
+ val status = Status.newBuilder().setCode(200)
+ .setEventType(EventType.EVENT_COMPONENT_EXECUTED)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+
+ }
+}
+
+/** For Integration testing stat this server */
+fun main() {
+ try {
+ val server = ServerBuilder
+ .forPort(50052)
+ .addService(MockBluePrintProcessingServer())
+ .build()
+ server.start()
+ log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...")
+ server.awaitTermination()
+ } catch (e: Exception) {
+ e.printStackTrace()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
new file mode 100644
index 000000000..29d24c6ad
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -0,0 +1,144 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import io.grpc.inprocess.InProcessChannelBuilder
+import io.grpc.inprocess.InProcessServerBuilder
+import io.grpc.testing.GrpcCleanupRule
+import io.mockk.coEvery
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.collect
+import org.junit.Rule
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+
+class StreamingRemoteExecutionServiceTest {
+
+ val log = logger(StreamingRemoteExecutionServiceTest::class)
+
+ @get:Rule
+ val grpcCleanup = GrpcCleanupRule()
+ private val serverName = InProcessServerBuilder.generateName()
+ private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
+ private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
+
+ private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
+ host = "127.0.0.1"
+ port = 50052
+ type = GRPCLibConstants.TYPE_TOKEN_AUTH
+ token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
+ }
+
+ @Test
+ @ExperimentalCoroutinesApi
+ @FlowPreview
+ fun testStreamingChannel() {
+ grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
+ val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
+
+ runBlocking {
+ val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
+
+ val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
+
+ val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
+ /** To test with real server, uncomment below line */
+ coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ /** Test Send and Receive non interactive transaction */
+ val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "1234-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.subRequestId
+ val deferred = async {
+ val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
+ invocationId, request, 1000L)
+ assertNotNull(response, "failed to get non interactive response")
+ assertEquals(response.commonHeader.requestId, requestId,
+ "failed to match non interactive response id")
+ assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
+ "failed to match non interactive response type")
+ }
+ nonInteractiveDeferred.add(deferred)
+
+ }
+ nonInteractiveDeferred.awaitAll()
+
+ /** Test Send and Receive interactive transaction */
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "12345-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.requestId
+ val responseFlow = spyStreamingRemoteExecutionService
+ .openSubscription(tokenAuthGrpcClientProperties, invocationId)
+
+ val deferred = async {
+ responseFlow.collect {
+ log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
+ }
+ }
+ }
+ responseFlowsDeferred.add(deferred)
+ /** Sending Multiple messages with same requestId and different subRequestId */
+ spyStreamingRemoteExecutionService.send(invocationId, request)
+ }
+ responseFlowsDeferred.awaitAll()
+ streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
+ }
+
+ }
+
+ private fun getRequest(requestId: String): ExecutionServiceInput {
+ val commonHeader = CommonHeader.newBuilder()
+ .setTimestamp("2012-04-23T18:25:43.511Z")
+ .setOriginatorId("System")
+ .setRequestId(requestId)
+ .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
+
+
+ val actionIdentifier = ActionIdentifiers.newBuilder()
+ .setActionName("SampleScript")
+ .setBlueprintName("sample-cba")
+ .setBlueprintVersion("1.0.0")
+ .build()
+
+ return ExecutionServiceInput.newBuilder()
+ .setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifier)
+ //.setPayload(payloadBuilder.build())
+ .build()
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
index 703a52642..afe10b39d 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>