aboutsummaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/services/execution-service
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/services/execution-service')
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/pom.xml5
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt174
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt97
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt119
4 files changed, 395 insertions, 0 deletions
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
index 8bee7c91c..f3044c8bb 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
@@ -64,5 +64,10 @@
<groupId>org.onap.ccsdk.sli.core</groupId>
<artifactId>sli-provider</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
new file mode 100644
index 000000000..07848164e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -0,0 +1,174 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
+import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
+import io.grpc.ManagedChannel
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+
+interface StreamingRemoteExecutionService<ReqT, ResT> {
+
+ suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+
+ suspend fun send(input: ReqT)
+
+ suspend fun cancelSubscription(requestId: String)
+
+ suspend fun closeChannel(selector: Any)
+}
+
+@Service
+@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
+ : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+
+ private val log = logger(StreamingRemoteExecutionServiceImpl::class)
+
+ private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
+
+ private val commChannels: MutableMap<String,
+ ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
+
+
+ /**
+ * Open new channel to send and receive for grpc properties [selector] for [requestId],
+ * Create the only one GRPC channel per host port and reuse for further communication.
+ * Create request communication channel to send and receive requests and responses.
+ * We can send multiple request with same requestId with unique subRequestId.
+ * Consume the flow for responses,
+ * Client should cancel the subscription for the request Id once no longer response is needed.
+ * */
+ @FlowPreview
+ override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+
+ if (!commChannels.containsKey(requestId)) {
+ /** Get GRPC Channel*/
+ val grpcChannel = grpcChannel(selector)
+
+ /** Get Send and Receive Channel for bidirectional process method*/
+ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+ commChannels[requestId] = channels
+ }
+
+ val commChannel = commChannels[requestId]
+ ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+
+ log.info("created subscription for request($requestId)")
+
+ return commChannel.responseChannel.consumeAsFlow()
+ }
+
+ /**
+ * Send the [input]request, by reusing same GRPC channel and Communication channel
+ * for the request Id.
+ */
+ override suspend fun send(input: ExecutionServiceInput) {
+ val requestId = input.commonHeader.requestId
+ val subRequestId = input.commonHeader.subRequestId
+ val sendChannel = commChannels[requestId]?.requestChannel
+ ?: throw BluePrintException("failed to get request($requestId) send channel")
+ coroutineScope {
+ launch {
+ sendChannel.send(input)
+ log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+ }
+ }
+ }
+
+ /** Cancel the Subscription for the [requestId], This closes communication channel **/
+ @ExperimentalCoroutinesApi
+ override suspend fun cancelSubscription(requestId: String) {
+ commChannels[requestId]?.let {
+ if (!it.requestChannel.isClosedForSend)
+ it.requestChannel.close()
+ /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
+ //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ commChannels.remove(requestId)
+ log.info("closed subscription for request($requestId)")
+ }
+ }
+
+ /** Close the GRPC channel for the host port poperties [selector]*/
+ override suspend fun closeChannel(selector: Any) {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ if (grpcChannels.containsKey(selectorName)) {
+ grpcChannels[selectorName]!!.shutdownNow()
+ grpcChannels.remove(selectorName)
+ log.info("grpc channel($selectorName) shutdown completed")
+ }
+ }
+
+ /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
+ private suspend fun grpcChannel(selector: Any): ManagedChannel {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
+ val grpcChannel = if (isGrpcChannelCached) {
+ if (grpcChannels[selectorName]!!.isShutdown) {
+ createGrpcChannel(grpcProperties)
+ } else {
+ grpcChannels[selectorName]!!
+ }
+ } else {
+ createGrpcChannel(grpcProperties)
+ }
+ grpcChannels[selectorName] = grpcChannel
+ return grpcChannel
+ }
+
+ suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
+ val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
+ .blueprintGrpcClientService(grpcProperties)
+ return grpcClientService.channel()
+ }
+
+ private fun grpcProperties(selector: Any): GrpcClientProperties {
+ return when (selector) {
+ is String -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
+ }
+ is JsonNode -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
+ }
+ is GrpcClientProperties -> {
+ selector
+ }
+ else -> {
+ throw BluePrintException("couldn't process selector($selector)")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
new file mode 100644
index 000000000..e291aa78e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
+
+import io.grpc.ServerBuilder
+import io.grpc.stub.StreamObserver
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+
+private val log = logger(MockBluePrintProcessingServer::class)
+
+
+class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
+
+ override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
+
+ return object : StreamObserver<ExecutionServiceInput> {
+ override fun onNext(executionServiceInput: ExecutionServiceInput) {
+ log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
+ "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
+ responseObserver.onNext(buildNotification(executionServiceInput))
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ responseObserver.onCompleted()
+ }
+
+ override fun onError(error: Throwable) {
+ log.debug("Fail to process message", error)
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription(error.message)
+ .asException())
+ }
+
+ override fun onCompleted() {
+ log.info("Completed")
+ }
+ }
+ }
+
+
+ private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput {
+ val status = Status.newBuilder()
+ .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+ }
+
+ private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput {
+
+ val status = Status.newBuilder().setCode(200)
+ .setEventType(EventType.EVENT_COMPONENT_EXECUTED)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+
+ }
+}
+
+/** For Integration testing stat this server */
+fun main() {
+ try {
+ val server = ServerBuilder
+ .forPort(50052)
+ .addService(MockBluePrintProcessingServer())
+ .build()
+ server.start()
+ log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...")
+ server.awaitTermination()
+ } catch (e: Exception) {
+ e.printStackTrace()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
new file mode 100644
index 000000000..c9ff23573
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -0,0 +1,119 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import io.grpc.inprocess.InProcessChannelBuilder
+import io.grpc.inprocess.InProcessServerBuilder
+import io.grpc.testing.GrpcCleanupRule
+import io.mockk.coEvery
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.collect
+import org.junit.Rule
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import java.util.*
+
+
+class StreamingRemoteExecutionServiceTest {
+
+ val log = logger(StreamingRemoteExecutionServiceTest::class)
+
+ @get:Rule
+ val grpcCleanup = GrpcCleanupRule()
+ private val serverName = InProcessServerBuilder.generateName()
+ private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
+ private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
+
+ private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
+ host = "127.0.0.1"
+ port = 50052
+ type = GRPCLibConstants.TYPE_TOKEN_AUTH
+ token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
+ }
+
+ @Test
+ @ExperimentalCoroutinesApi
+ @FlowPreview
+ fun testStreamingChannel() {
+ grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
+ val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
+
+ runBlocking {
+ val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
+
+ val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
+
+ val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
+ /** To test with real server, uncomment below line */
+ coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+
+ repeat(1) { count ->
+ val requestId = "12345-$count"
+ val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
+
+ val deferred = async {
+ responseFlow.collect {
+ log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
+ }
+ }
+ }
+ responseFlowsDeferred.add(deferred)
+ /** Sending Multiple messages with same requestId and different subRequestId */
+ spyStreamingRemoteExecutionService.send(getRequest(requestId))
+ }
+ responseFlowsDeferred.awaitAll()
+ streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
+ }
+
+ }
+
+ private fun getRequest(requestId: String): ExecutionServiceInput {
+ val commonHeader = CommonHeader.newBuilder()
+ .setTimestamp("2012-04-23T18:25:43.511Z")
+ .setOriginatorId("System")
+ .setRequestId(requestId)
+ .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
+
+
+ val actionIdentifier = ActionIdentifiers.newBuilder()
+ .setActionName("SampleScript")
+ .setBlueprintName("sample-cba")
+ .setBlueprintVersion("1.0.0")
+ .build()
+
+ return ExecutionServiceInput.newBuilder()
+ .setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifier)
+ //.setPayload(payloadBuilder.build())
+ .build()
+
+ }
+} \ No newline at end of file