diff options
author | Brinda Santh <bs2796@att.com> | 2019-10-03 20:02:29 -0400 |
---|---|---|
committer | Brinda Santh <bs2796@att.com> | 2019-10-03 20:02:29 -0400 |
commit | 4c2b42f141e1501222a284f964eb4303cc4f03aa (patch) | |
tree | 0d839c5126fda40f6a89f6bb5f06ab0e27b7ad44 /ms/blueprintsprocessor/modules/services | |
parent | 5006588ac1a41b5ff8207a13fa118866d7ef4924 (diff) |
Add bi-directional python executor tests
Issue-ID: CCSDK-1747
Signed-off-by: Brinda Santh <bs2796@att.com>
Change-Id: I2e2b974d35d6878418eafe8ece9fcb1d69622a61
Diffstat (limited to 'ms/blueprintsprocessor/modules/services')
4 files changed, 395 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml index 8bee7c91c..f3044c8bb 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml +++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml @@ -64,5 +64,10 @@ <groupId>org.onap.ccsdk.sli.core</groupId> <artifactId>sli-provider</artifactId> </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + </dependency> </dependencies> </project> diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt new file mode 100644 index 000000000..07848164e --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt @@ -0,0 +1,174 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution + +import com.fasterxml.jackson.databind.JsonNode +import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel +import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming +import io.grpc.ManagedChannel +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.launch +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service + +interface StreamingRemoteExecutionService<ReqT, ResT> { + + suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT> + + suspend fun send(input: ReqT) + + suspend fun cancelSubscription(requestId: String) + + suspend fun closeChannel(selector: Any) +} + +@Service +@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"], + havingValue = "true", matchIfMissing = false) +class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) + : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { + + private val log = logger(StreamingRemoteExecutionServiceImpl::class) + + private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf() + + private val commChannels: MutableMap<String, + ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf() + + + /** + * Open new channel to send and receive for grpc properties [selector] for [requestId], + * Create the only one GRPC channel per host port and reuse for further communication. + * Create request communication channel to send and receive requests and responses. + * We can send multiple request with same requestId with unique subRequestId. + * Consume the flow for responses, + * Client should cancel the subscription for the request Id once no longer response is needed. + * */ + @FlowPreview + override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> { + + if (!commChannels.containsKey(requestId)) { + /** Get GRPC Channel*/ + val grpcChannel = grpcChannel(selector) + + /** Get Send and Receive Channel for bidirectional process method*/ + val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel) + commChannels[requestId] = channels + } + + val commChannel = commChannels[requestId] + ?: throw BluePrintException("failed to create response subscription for request($requestId) channel") + + log.info("created subscription for request($requestId)") + + return commChannel.responseChannel.consumeAsFlow() + } + + /** + * Send the [input]request, by reusing same GRPC channel and Communication channel + * for the request Id. + */ + override suspend fun send(input: ExecutionServiceInput) { + val requestId = input.commonHeader.requestId + val subRequestId = input.commonHeader.subRequestId + val sendChannel = commChannels[requestId]?.requestChannel + ?: throw BluePrintException("failed to get request($requestId) send channel") + coroutineScope { + launch { + sendChannel.send(input) + log.trace("Message sent for request($requestId) : subRequest($subRequestId)") + } + } + } + + /** Cancel the Subscription for the [requestId], This closes communication channel **/ + @ExperimentalCoroutinesApi + override suspend fun cancelSubscription(requestId: String) { + commChannels[requestId]?.let { + if (!it.requestChannel.isClosedForSend) + it.requestChannel.close() + /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */ + //it.responseChannel.cancel(CancellationException("subscription cancelled")) + commChannels.remove(requestId) + log.info("closed subscription for request($requestId)") + } + } + + /** Close the GRPC channel for the host port poperties [selector]*/ + override suspend fun closeChannel(selector: Any) { + val grpcProperties = grpcProperties(selector) + val selectorName = "${grpcProperties.host}:${grpcProperties.port}" + if (grpcChannels.containsKey(selectorName)) { + grpcChannels[selectorName]!!.shutdownNow() + grpcChannels.remove(selectorName) + log.info("grpc channel($selectorName) shutdown completed") + } + } + + /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */ + private suspend fun grpcChannel(selector: Any): ManagedChannel { + val grpcProperties = grpcProperties(selector) + val selectorName = "${grpcProperties.host}:${grpcProperties.port}" + val isGrpcChannelCached = grpcChannels.containsKey(selectorName) + val grpcChannel = if (isGrpcChannelCached) { + if (grpcChannels[selectorName]!!.isShutdown) { + createGrpcChannel(grpcProperties) + } else { + grpcChannels[selectorName]!! + } + } else { + createGrpcChannel(grpcProperties) + } + grpcChannels[selectorName] = grpcChannel + return grpcChannel + } + + suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel { + val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService + .blueprintGrpcClientService(grpcProperties) + return grpcClientService.channel() + } + + private fun grpcProperties(selector: Any): GrpcClientProperties { + return when (selector) { + is String -> { + bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString()) + } + is JsonNode -> { + bluePrintGrpcLibPropertyService.grpcClientProperties(selector) + } + is GrpcClientProperties -> { + selector + } + else -> { + throw BluePrintException("couldn't process selector($selector)") + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt new file mode 100644 index 000000000..e291aa78e --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt @@ -0,0 +1,97 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts + +import io.grpc.ServerBuilder +import io.grpc.stub.StreamObserver +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.common.api.Status +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput + +private val log = logger(MockBluePrintProcessingServer::class) + + +class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { + + override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { + + return object : StreamObserver<ExecutionServiceInput> { + override fun onNext(executionServiceInput: ExecutionServiceInput) { + log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " + + "subRequestId(${executionServiceInput.commonHeader.subRequestId})") + responseObserver.onNext(buildNotification(executionServiceInput)) + responseObserver.onNext(buildResponse(executionServiceInput)) + responseObserver.onCompleted() + } + + override fun onError(error: Throwable) { + log.debug("Fail to process message", error) + responseObserver.onError(io.grpc.Status.INTERNAL + .withDescription(error.message) + .asException()) + } + + override fun onCompleted() { + log.info("Completed") + } + } + } + + + private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput { + val status = Status.newBuilder() + .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + } + + private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput { + + val status = Status.newBuilder().setCode(200) + .setEventType(EventType.EVENT_COMPONENT_EXECUTED) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + + } +} + +/** For Integration testing stat this server */ +fun main() { + try { + val server = ServerBuilder + .forPort(50052) + .addService(MockBluePrintProcessingServer()) + .build() + server.start() + log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...") + server.awaitTermination() + } catch (e: Exception) { + e.printStackTrace() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt new file mode 100644 index 000000000..c9ff23573 --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt @@ -0,0 +1,119 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution + +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import io.grpc.testing.GrpcCleanupRule +import io.mockk.coEvery +import io.mockk.mockk +import io.mockk.spyk +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect +import org.junit.Rule +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import java.util.* + + +class StreamingRemoteExecutionServiceTest { + + val log = logger(StreamingRemoteExecutionServiceTest::class) + + @get:Rule + val grpcCleanup = GrpcCleanupRule() + private val serverName = InProcessServerBuilder.generateName() + private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor() + private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor() + + private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply { + host = "127.0.0.1" + port = 50052 + type = GRPCLibConstants.TYPE_TOKEN_AUTH + token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==" + } + + @Test + @ExperimentalCoroutinesApi + @FlowPreview + fun testStreamingChannel() { + grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start()) + val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build()) + + runBlocking { + val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk()) + + val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService) + + val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService) + /** To test with real server, uncomment below line */ + coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel + + val responseFlowsDeferred = arrayListOf<Deferred<*>>() + + repeat(1) { count -> + val requestId = "12345-$count" + val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId) + + val deferred = async { + responseFlow.collect { + log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId) + } + } + } + responseFlowsDeferred.add(deferred) + /** Sending Multiple messages with same requestId and different subRequestId */ + spyStreamingRemoteExecutionService.send(getRequest(requestId)) + } + responseFlowsDeferred.awaitAll() + streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties) + } + + } + + private fun getRequest(requestId: String): ExecutionServiceInput { + val commonHeader = CommonHeader.newBuilder() + .setTimestamp("2012-04-23T18:25:43.511Z") + .setOriginatorId("System") + .setRequestId(requestId) + .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build() + + + val actionIdentifier = ActionIdentifiers.newBuilder() + .setActionName("SampleScript") + .setBlueprintName("sample-cba") + .setBlueprintVersion("1.0.0") + .build() + + return ExecutionServiceInput.newBuilder() + .setCommonHeader(commonHeader) + .setActionIdentifiers(actionIdentifier) + //.setPayload(payloadBuilder.build()) + .build() + + } +}
\ No newline at end of file |