summaryrefslogtreecommitdiffstats
path: root/ms
diff options
context:
space:
mode:
Diffstat (limited to 'ms')
-rwxr-xr-xms/blueprintsprocessor/application/pom.xml2
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml19
-rwxr-xr-xms/blueprintsprocessor/application/src/main/resources/application.properties4
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/pom.xml2
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/pom.xml5
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt174
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt97
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt119
-rwxr-xr-xms/blueprintsprocessor/parent/pom.xml2
-rw-r--r--ms/py-executor/blueprints_grpc/blueprint_processing_server.py2
-rw-r--r--ms/py-executor/blueprints_grpc/executor_utils.py2
-rwxr-xr-xms/py-executor/dc/docker-compose.yaml23
-rw-r--r--ms/py-executor/docker/Dockerfile4
-rwxr-xr-xms/py-executor/docker/distribution.xml6
15 files changed, 453 insertions, 10 deletions
diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml
index cd13c9cbb..ed1b67dfd 100755
--- a/ms/blueprintsprocessor/application/pom.xml
+++ b/ms/blueprintsprocessor/application/pom.xml
@@ -85,10 +85,12 @@
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>configs-api</artifactId>
</dependency>
+ <!--
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>health-api</artifactId>
</dependency>
+ -->
<!-- Functions -->
<dependency>
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index 407aa6b20..e4bb00773 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -41,6 +41,25 @@ services:
restart: always
volumes:
- blueprints-deploy:/opt/app/onap/blueprints/deploy
+ py-executor:
+ depends_on:
+ - db
+ image: onap/ccsdk-py-script-executor
+ container_name: bp-py-executor
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ environment:
+ APPLICATIONNAME: PythonExecutor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==
+ LOG_FILE: /opt/app/onap/logs/application.log
volumes:
blueprints-deploy:
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application.properties b/ms/blueprintsprocessor/application/src/main/resources/application.properties
index 1378c62c5..b8f0d2344 100755
--- a/ms/blueprintsprocessor/application/src/main/resources/application.properties
+++ b/ms/blueprintsprocessor/application/src/main/resources/application.properties
@@ -66,8 +66,8 @@ security.user.password: {bcrypt}$2a$10$duaUzVUVW0YPQCSIbGEkQOXwafZGwQ/b32/Ys4R1i
security.user.name: ccsdkapps
# Used in Health Check
-endpoints.user.name=ccsdkapps
-endpoints.user.password=ccsdkapps
+#endpoints.user.name=ccsdkapps
+#endpoints.user.password=ccsdkapps
# Executor Options
blueprintsprocessor.resourceResolution.enabled=true
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
index 088533a71..a1d2188ab 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
@@ -73,7 +73,7 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
}
}
- private fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
+ fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
BluePrintGrpcClientService {
when (grpcClientProperties) {
is TokenAuthGrpcClientProperties -> {
diff --git a/ms/blueprintsprocessor/modules/inbounds/pom.xml b/ms/blueprintsprocessor/modules/inbounds/pom.xml
index 1729f2ff1..81ffea176 100644
--- a/ms/blueprintsprocessor/modules/inbounds/pom.xml
+++ b/ms/blueprintsprocessor/modules/inbounds/pom.xml
@@ -35,7 +35,7 @@
<module>designer-api</module>
<module>resource-api</module>
<module>selfservice-api</module>
- <module>health-api</module>
+ <!--<module>health-api</module>-->
</modules>
<dependencies>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
index 8bee7c91c..f3044c8bb 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
@@ -64,5 +64,10 @@
<groupId>org.onap.ccsdk.sli.core</groupId>
<artifactId>sli-provider</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
new file mode 100644
index 000000000..07848164e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -0,0 +1,174 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
+import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
+import io.grpc.ManagedChannel
+import kotlinx.coroutines.ExperimentalCoroutinesApi
+import kotlinx.coroutines.FlowPreview
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.consumeAsFlow
+import kotlinx.coroutines.launch
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+
+interface StreamingRemoteExecutionService<ReqT, ResT> {
+
+ suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+
+ suspend fun send(input: ReqT)
+
+ suspend fun cancelSubscription(requestId: String)
+
+ suspend fun closeChannel(selector: Any)
+}
+
+@Service
+@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
+ : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+
+ private val log = logger(StreamingRemoteExecutionServiceImpl::class)
+
+ private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
+
+ private val commChannels: MutableMap<String,
+ ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
+
+
+ /**
+ * Open new channel to send and receive for grpc properties [selector] for [requestId],
+ * Create the only one GRPC channel per host port and reuse for further communication.
+ * Create request communication channel to send and receive requests and responses.
+ * We can send multiple request with same requestId with unique subRequestId.
+ * Consume the flow for responses,
+ * Client should cancel the subscription for the request Id once no longer response is needed.
+ * */
+ @FlowPreview
+ override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+
+ if (!commChannels.containsKey(requestId)) {
+ /** Get GRPC Channel*/
+ val grpcChannel = grpcChannel(selector)
+
+ /** Get Send and Receive Channel for bidirectional process method*/
+ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+ commChannels[requestId] = channels
+ }
+
+ val commChannel = commChannels[requestId]
+ ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+
+ log.info("created subscription for request($requestId)")
+
+ return commChannel.responseChannel.consumeAsFlow()
+ }
+
+ /**
+ * Send the [input]request, by reusing same GRPC channel and Communication channel
+ * for the request Id.
+ */
+ override suspend fun send(input: ExecutionServiceInput) {
+ val requestId = input.commonHeader.requestId
+ val subRequestId = input.commonHeader.subRequestId
+ val sendChannel = commChannels[requestId]?.requestChannel
+ ?: throw BluePrintException("failed to get request($requestId) send channel")
+ coroutineScope {
+ launch {
+ sendChannel.send(input)
+ log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+ }
+ }
+ }
+
+ /** Cancel the Subscription for the [requestId], This closes communication channel **/
+ @ExperimentalCoroutinesApi
+ override suspend fun cancelSubscription(requestId: String) {
+ commChannels[requestId]?.let {
+ if (!it.requestChannel.isClosedForSend)
+ it.requestChannel.close()
+ /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
+ //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ commChannels.remove(requestId)
+ log.info("closed subscription for request($requestId)")
+ }
+ }
+
+ /** Close the GRPC channel for the host port poperties [selector]*/
+ override suspend fun closeChannel(selector: Any) {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ if (grpcChannels.containsKey(selectorName)) {
+ grpcChannels[selectorName]!!.shutdownNow()
+ grpcChannels.remove(selectorName)
+ log.info("grpc channel($selectorName) shutdown completed")
+ }
+ }
+
+ /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
+ private suspend fun grpcChannel(selector: Any): ManagedChannel {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
+ val grpcChannel = if (isGrpcChannelCached) {
+ if (grpcChannels[selectorName]!!.isShutdown) {
+ createGrpcChannel(grpcProperties)
+ } else {
+ grpcChannels[selectorName]!!
+ }
+ } else {
+ createGrpcChannel(grpcProperties)
+ }
+ grpcChannels[selectorName] = grpcChannel
+ return grpcChannel
+ }
+
+ suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
+ val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
+ .blueprintGrpcClientService(grpcProperties)
+ return grpcClientService.channel()
+ }
+
+ private fun grpcProperties(selector: Any): GrpcClientProperties {
+ return when (selector) {
+ is String -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
+ }
+ is JsonNode -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
+ }
+ is GrpcClientProperties -> {
+ selector
+ }
+ else -> {
+ throw BluePrintException("couldn't process selector($selector)")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
new file mode 100644
index 000000000..e291aa78e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
+
+import io.grpc.ServerBuilder
+import io.grpc.stub.StreamObserver
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+
+private val log = logger(MockBluePrintProcessingServer::class)
+
+
+class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
+
+ override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
+
+ return object : StreamObserver<ExecutionServiceInput> {
+ override fun onNext(executionServiceInput: ExecutionServiceInput) {
+ log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
+ "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
+ responseObserver.onNext(buildNotification(executionServiceInput))
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ responseObserver.onCompleted()
+ }
+
+ override fun onError(error: Throwable) {
+ log.debug("Fail to process message", error)
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription(error.message)
+ .asException())
+ }
+
+ override fun onCompleted() {
+ log.info("Completed")
+ }
+ }
+ }
+
+
+ private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput {
+ val status = Status.newBuilder()
+ .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+ }
+
+ private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput {
+
+ val status = Status.newBuilder().setCode(200)
+ .setEventType(EventType.EVENT_COMPONENT_EXECUTED)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+
+ }
+}
+
+/** For Integration testing stat this server */
+fun main() {
+ try {
+ val server = ServerBuilder
+ .forPort(50052)
+ .addService(MockBluePrintProcessingServer())
+ .build()
+ server.start()
+ log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...")
+ server.awaitTermination()
+ } catch (e: Exception) {
+ e.printStackTrace()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
new file mode 100644
index 000000000..c9ff23573
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -0,0 +1,119 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import io.grpc.inprocess.InProcessChannelBuilder
+import io.grpc.inprocess.InProcessServerBuilder
+import io.grpc.testing.GrpcCleanupRule
+import io.mockk.coEvery
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.collect
+import org.junit.Rule
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import java.util.*
+
+
+class StreamingRemoteExecutionServiceTest {
+
+ val log = logger(StreamingRemoteExecutionServiceTest::class)
+
+ @get:Rule
+ val grpcCleanup = GrpcCleanupRule()
+ private val serverName = InProcessServerBuilder.generateName()
+ private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
+ private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
+
+ private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
+ host = "127.0.0.1"
+ port = 50052
+ type = GRPCLibConstants.TYPE_TOKEN_AUTH
+ token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
+ }
+
+ @Test
+ @ExperimentalCoroutinesApi
+ @FlowPreview
+ fun testStreamingChannel() {
+ grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
+ val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
+
+ runBlocking {
+ val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
+
+ val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
+
+ val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
+ /** To test with real server, uncomment below line */
+ coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+
+ repeat(1) { count ->
+ val requestId = "12345-$count"
+ val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
+
+ val deferred = async {
+ responseFlow.collect {
+ log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
+ }
+ }
+ }
+ responseFlowsDeferred.add(deferred)
+ /** Sending Multiple messages with same requestId and different subRequestId */
+ spyStreamingRemoteExecutionService.send(getRequest(requestId))
+ }
+ responseFlowsDeferred.awaitAll()
+ streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
+ }
+
+ }
+
+ private fun getRequest(requestId: String): ExecutionServiceInput {
+ val commonHeader = CommonHeader.newBuilder()
+ .setTimestamp("2012-04-23T18:25:43.511Z")
+ .setOriginatorId("System")
+ .setRequestId(requestId)
+ .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
+
+
+ val actionIdentifier = ActionIdentifiers.newBuilder()
+ .setActionName("SampleScript")
+ .setBlueprintName("sample-cba")
+ .setBlueprintVersion("1.0.0")
+ .build()
+
+ return ExecutionServiceInput.newBuilder()
+ .setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifier)
+ //.setPayload(payloadBuilder.build())
+ .build()
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml
index ac123cbfd..37a071280 100755
--- a/ms/blueprintsprocessor/parent/pom.xml
+++ b/ms/blueprintsprocessor/parent/pom.xml
@@ -386,11 +386,13 @@
<version>${project.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>health-api</artifactId>
<version>${project.version}</version>
</dependency>
+ -->
<!-- North Bound -->
<dependency>
diff --git a/ms/py-executor/blueprints_grpc/blueprint_processing_server.py b/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
index 0c432d4cd..f1f29a035 100644
--- a/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
+++ b/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
@@ -46,4 +46,4 @@ class BluePrintProcessingServer(BluePrintProcessing_pb2_grpc.BluePrintProcessing
# Get the Dynamic Process Instance based on request
instance: AbstractScriptFunction = instance_for_input(self.configuration, request)
instance.set_context(context)
- return instance.process(request)
+ yield from instance.process(request)
diff --git a/ms/py-executor/blueprints_grpc/executor_utils.py b/ms/py-executor/blueprints_grpc/executor_utils.py
index 224e35ef9..44b6d8e87 100644
--- a/ms/py-executor/blueprints_grpc/executor_utils.py
+++ b/ms/py-executor/blueprints_grpc/executor_utils.py
@@ -29,7 +29,7 @@ logger = logging.getLogger("Utils")
def current_time():
ts = time.time()
- return datetime.datetime.fromtimestamp(ts).strftime("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def blueprint_id(input: ExecutionServiceInput):
diff --git a/ms/py-executor/dc/docker-compose.yaml b/ms/py-executor/dc/docker-compose.yaml
new file mode 100755
index 000000000..76009411b
--- /dev/null
+++ b/ms/py-executor/dc/docker-compose.yaml
@@ -0,0 +1,23 @@
+version: '3.3'
+
+services:
+ py-executor:
+ image: onap/ccsdk-py-executor
+ container_name: bp-py-executor
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ environment:
+ APPLICATIONNAME: PythonExecutor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==
+ LOG_FILE: /opt/app/onap/logs/application.log
+
+volumes:
+ blueprints-deploy:
diff --git a/ms/py-executor/docker/Dockerfile b/ms/py-executor/docker/Dockerfile
index 112180581..b49daf68f 100644
--- a/ms/py-executor/docker/Dockerfile
+++ b/ms/py-executor/docker/Dockerfile
@@ -11,8 +11,8 @@ RUN tar -xzf /source.tar.gz -C /tmp \
&& rm -rf /source.tar.gz \
&& rm -rf /tmp/@project.build.finalName@
-RUN pip install --no-cache-dir -r /opt/app/onap/requirements.txt
+RUN pip install --no-cache-dir -r /opt/app/onap/python/requirements.txt
VOLUME /opt/app/onap/blueprints/deploy/
-ENTRYPOINT /opt/app/onap/start.sh \ No newline at end of file
+ENTRYPOINT /opt/app/onap/python/start.sh \ No newline at end of file
diff --git a/ms/py-executor/docker/distribution.xml b/ms/py-executor/docker/distribution.xml
index 8bd06c88f..6235a7b8a 100755
--- a/ms/py-executor/docker/distribution.xml
+++ b/ms/py-executor/docker/distribution.xml
@@ -34,20 +34,22 @@
</fileSet>
<fileSet>
<directory>${project.basedir}</directory>
- <outputDirectory>opt/app/onap</outputDirectory>
+ <outputDirectory>opt/app/onap/python</outputDirectory>
<includes>
<include>requirements.txt</include>
<include>configuration.ini</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
+ <fileMode>0666</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/docker</directory>
- <outputDirectory>opt/app/onap</outputDirectory>
+ <outputDirectory>opt/app/onap/python</outputDirectory>
<includes>
<include>*.sh</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
+ <fileMode>0755</fileMode>
</fileSet>
</fileSets>
</assembly> \ No newline at end of file