summaryrefslogtreecommitdiffstats
path: root/ms
diff options
context:
space:
mode:
Diffstat (limited to 'ms')
-rwxr-xr-xms/blueprintsprocessor/application/pom.xml2
-rwxr-xr-xms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml19
-rwxr-xr-xms/blueprintsprocessor/application/src/main/resources/application.properties4
-rw-r--r--ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt5
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/pom.xml2
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/pom.xml5
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt205
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt97
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt144
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml2
-rwxr-xr-xms/blueprintsprocessor/parent/pom.xml2
-rw-r--r--ms/py-executor/blueprints_grpc/blueprint_processing_server.py2
-rw-r--r--ms/py-executor/blueprints_grpc/executor_utils.py2
-rwxr-xr-xms/py-executor/dc/docker-compose.yaml23
-rw-r--r--ms/py-executor/docker/Dockerfile4
-rwxr-xr-xms/py-executor/docker/distribution.xml6
-rw-r--r--ms/py-executor/requirements.txt3
20 files changed, 520 insertions, 15 deletions
diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml
index cd13c9cbb..ed1b67dfd 100755
--- a/ms/blueprintsprocessor/application/pom.xml
+++ b/ms/blueprintsprocessor/application/pom.xml
@@ -85,10 +85,12 @@
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>configs-api</artifactId>
</dependency>
+ <!--
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>health-api</artifactId>
</dependency>
+ -->
<!-- Functions -->
<dependency>
diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
index 407aa6b20..e4bb00773 100755
--- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
+++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml
@@ -41,6 +41,25 @@ services:
restart: always
volumes:
- blueprints-deploy:/opt/app/onap/blueprints/deploy
+ py-executor:
+ depends_on:
+ - db
+ image: onap/ccsdk-py-script-executor
+ container_name: bp-py-executor
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ environment:
+ APPLICATIONNAME: PythonExecutor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==
+ LOG_FILE: /opt/app/onap/logs/application.log
volumes:
blueprints-deploy:
diff --git a/ms/blueprintsprocessor/application/src/main/resources/application.properties b/ms/blueprintsprocessor/application/src/main/resources/application.properties
index 1378c62c5..b8f0d2344 100755
--- a/ms/blueprintsprocessor/application/src/main/resources/application.properties
+++ b/ms/blueprintsprocessor/application/src/main/resources/application.properties
@@ -66,8 +66,8 @@ security.user.password: {bcrypt}$2a$10$duaUzVUVW0YPQCSIbGEkQOXwafZGwQ/b32/Ys4R1i
security.user.name: ccsdkapps
# Used in Health Check
-endpoints.user.name=ccsdkapps
-endpoints.user.password=ccsdkapps
+#endpoints.user.name=ccsdkapps
+#endpoints.user.password=ccsdkapps
# Executor Options
blueprintsprocessor.resourceResolution.enabled=true
diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
index 088533a71..a1d2188ab 100644
--- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
+++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt
@@ -73,7 +73,7 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue
}
}
- private fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
+ fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties):
BluePrintGrpcClientService {
when (grpcClientProperties) {
is TokenAuthGrpcClientProperties -> {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index ab04054fe..1cd8a2af7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
var clientId: String? = null
var topic: String? = null
var pollMillSec: Long = 1000
+ var pollRecords: Int = -1
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 5a9e61bfd..b5d444a49 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
+ /** To handle Back pressure, Get only configured record for processing */
+ if (messageConsumerProperties.pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+ }
// TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.info("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 2b84eaa78..f4e85a94b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -52,6 +52,7 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
"blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
@@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest {
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
- delay(1000)
+ delay(100)
blueprintMessageProducerService.sendMessage("this is my message($it)")
}
}
- delay(10000)
+ delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/pom.xml b/ms/blueprintsprocessor/modules/inbounds/pom.xml
index 1729f2ff1..81ffea176 100644
--- a/ms/blueprintsprocessor/modules/inbounds/pom.xml
+++ b/ms/blueprintsprocessor/modules/inbounds/pom.xml
@@ -35,7 +35,7 @@
<module>designer-api</module>
<module>resource-api</module>
<module>selfservice-api</module>
- <module>health-api</module>
+ <!--<module>health-api</module>-->
</modules>
<dependencies>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
index 8bee7c91c..f3044c8bb 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml
@@ -64,5 +64,10 @@
<groupId>org.onap.ccsdk.sli.core</groupId>
<artifactId>sli-provider</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>io.grpc</groupId>
+ <artifactId>grpc-testing</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
new file mode 100644
index 000000000..adb1d67d2
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -0,0 +1,205 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
+import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
+import io.grpc.ManagedChannel
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
+import kotlinx.coroutines.flow.consumeAsFlow
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
+import org.springframework.stereotype.Service
+
+interface StreamingRemoteExecutionService<ReqT, ResT> {
+
+ suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
+
+ suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
+
+ suspend fun send(txId: String, input: ReqT)
+
+ suspend fun cancelSubscription(txId: String)
+
+ suspend fun closeChannel(selector: Any)
+}
+
+@Service
+@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"],
+ havingValue = "true", matchIfMissing = false)
+class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService)
+ : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> {
+
+ private val log = logger(StreamingRemoteExecutionServiceImpl::class)
+
+ private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf()
+
+ private val commChannels: MutableMap<String,
+ ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf()
+
+
+ /**
+ * Open new channel to send and receive for grpc properties [selector] for [txId],
+ * Create the only one GRPC channel per host port and reuse for further communication.
+ * Create request communication channel to send and receive requests and responses.
+ * We can send multiple request with same txId.
+ * Consume the flow for responses,
+ * Client should cancel the subscription for the request Id once no longer response is needed.
+ * */
+ @FlowPreview
+ override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
+
+ if (!commChannels.containsKey(txId)) {
+ /** Get GRPC Channel*/
+ val grpcChannel = grpcChannel(selector)
+
+ /** Get Send and Receive Channel for bidirectional process method*/
+ val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
+ commChannels[txId] = channels
+ }
+
+ val commChannel = commChannels[txId]
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
+
+ log.info("created subscription for transactionId($txId)")
+
+ return commChannel.responseChannel.consumeAsFlow()
+ }
+
+ /**
+ * Send the [input] request, by reusing same GRPC channel and Communication channel
+ * for the request Id.
+ */
+ override suspend fun send(txId: String, input: ExecutionServiceInput) {
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ coroutineScope {
+ launch {
+ sendChannel.send(input)
+ log.trace("Message sent for transactionId($txId)")
+ }
+ }
+ }
+
+ /**
+ * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+ * for the GRPC [selector] with execution [timeOutMill].
+ * Other state of the request will be skipped.
+ * The assumption here is you can call this API with same request Id and unique subrequest Id,
+ * so the correlation is sub request id to receive the response.
+ */
+ @ExperimentalCoroutinesApi
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+ : ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
+ }
+ }
+ return output!!
+ }
+
+ /** Cancel the Subscription for the [txId], This closes communication channel **/
+ @ExperimentalCoroutinesApi
+ override suspend fun cancelSubscription(txId: String) {
+ commChannels[txId]?.let {
+ if (!it.requestChannel.isClosedForSend)
+ it.requestChannel.close()
+ /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
+ //it.responseChannel.cancel(CancellationException("subscription cancelled"))
+ commChannels.remove(txId)
+ log.info("closed subscription for transactionId($txId)")
+ }
+ }
+
+ /** Close the GRPC channel for the host port poperties [selector]*/
+ override suspend fun closeChannel(selector: Any) {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ if (grpcChannels.containsKey(selectorName)) {
+ grpcChannels[selectorName]!!.shutdownNow()
+ grpcChannels.remove(selectorName)
+ log.info("grpc channel($selectorName) shutdown completed")
+ }
+ }
+
+ /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */
+ private suspend fun grpcChannel(selector: Any): ManagedChannel {
+ val grpcProperties = grpcProperties(selector)
+ val selectorName = "${grpcProperties.host}:${grpcProperties.port}"
+ val isGrpcChannelCached = grpcChannels.containsKey(selectorName)
+ val grpcChannel = if (isGrpcChannelCached) {
+ if (grpcChannels[selectorName]!!.isShutdown) {
+ createGrpcChannel(grpcProperties)
+ } else {
+ grpcChannels[selectorName]!!
+ }
+ } else {
+ createGrpcChannel(grpcProperties)
+ }
+ grpcChannels[selectorName] = grpcChannel
+ return grpcChannel
+ }
+
+ suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel {
+ val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService
+ .blueprintGrpcClientService(grpcProperties)
+ return grpcClientService.channel()
+ }
+
+ private fun grpcProperties(selector: Any): GrpcClientProperties {
+ return when (selector) {
+ is String -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString())
+ }
+ is JsonNode -> {
+ bluePrintGrpcLibPropertyService.grpcClientProperties(selector)
+ }
+ is GrpcClientProperties -> {
+ selector
+ }
+ else -> {
+ throw BluePrintException("couldn't process selector($selector)")
+ }
+ }
+ }
+}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
new file mode 100644
index 000000000..e291aa78e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt
@@ -0,0 +1,97 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts
+
+import io.grpc.ServerBuilder
+import io.grpc.stub.StreamObserver
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.common.api.Status
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput
+
+private val log = logger(MockBluePrintProcessingServer::class)
+
+
+class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() {
+
+ override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> {
+
+ return object : StreamObserver<ExecutionServiceInput> {
+ override fun onNext(executionServiceInput: ExecutionServiceInput) {
+ log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " +
+ "subRequestId(${executionServiceInput.commonHeader.subRequestId})")
+ responseObserver.onNext(buildNotification(executionServiceInput))
+ responseObserver.onNext(buildResponse(executionServiceInput))
+ responseObserver.onCompleted()
+ }
+
+ override fun onError(error: Throwable) {
+ log.debug("Fail to process message", error)
+ responseObserver.onError(io.grpc.Status.INTERNAL
+ .withDescription(error.message)
+ .asException())
+ }
+
+ override fun onCompleted() {
+ log.info("Completed")
+ }
+ }
+ }
+
+
+ private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput {
+ val status = Status.newBuilder()
+ .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+ }
+
+ private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput {
+
+ val status = Status.newBuilder().setCode(200)
+ .setEventType(EventType.EVENT_COMPONENT_EXECUTED)
+ .build()
+ return ExecutionServiceOutput.newBuilder()
+ .setCommonHeader(input.commonHeader)
+ .setActionIdentifiers(input.actionIdentifiers)
+ .setStatus(status)
+ .build()
+
+ }
+}
+
+/** For Integration testing stat this server */
+fun main() {
+ try {
+ val server = ServerBuilder
+ .forPort(50052)
+ .addService(MockBluePrintProcessingServer())
+ .build()
+ server.start()
+ log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...")
+ server.awaitTermination()
+ } catch (e: Exception) {
+ e.printStackTrace()
+ }
+
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
new file mode 100644
index 000000000..29d24c6ad
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -0,0 +1,144 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.services.execution
+
+import io.grpc.inprocess.InProcessChannelBuilder
+import io.grpc.inprocess.InProcessServerBuilder
+import io.grpc.testing.GrpcCleanupRule
+import io.mockk.coEvery
+import io.mockk.mockk
+import io.mockk.spyk
+import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.collect
+import org.junit.Rule
+import org.junit.Test
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties
+import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer
+import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers
+import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
+import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
+
+
+class StreamingRemoteExecutionServiceTest {
+
+ val log = logger(StreamingRemoteExecutionServiceTest::class)
+
+ @get:Rule
+ val grpcCleanup = GrpcCleanupRule()
+ private val serverName = InProcessServerBuilder.generateName()
+ private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor()
+ private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor()
+
+ private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply {
+ host = "127.0.0.1"
+ port = 50052
+ type = GRPCLibConstants.TYPE_TOKEN_AUTH
+ token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw=="
+ }
+
+ @Test
+ @ExperimentalCoroutinesApi
+ @FlowPreview
+ fun testStreamingChannel() {
+ grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start())
+ val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build())
+
+ runBlocking {
+ val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk())
+
+ val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService)
+
+ val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
+ /** To test with real server, uncomment below line */
+ coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ /** Test Send and Receive non interactive transaction */
+ val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "1234-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.subRequestId
+ val deferred = async {
+ val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
+ invocationId, request, 1000L)
+ assertNotNull(response, "failed to get non interactive response")
+ assertEquals(response.commonHeader.requestId, requestId,
+ "failed to match non interactive response id")
+ assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
+ "failed to match non interactive response type")
+ }
+ nonInteractiveDeferred.add(deferred)
+
+ }
+ nonInteractiveDeferred.awaitAll()
+
+ /** Test Send and Receive interactive transaction */
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "12345-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.requestId
+ val responseFlow = spyStreamingRemoteExecutionService
+ .openSubscription(tokenAuthGrpcClientProperties, invocationId)
+
+ val deferred = async {
+ responseFlow.collect {
+ log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
+ }
+ }
+ }
+ responseFlowsDeferred.add(deferred)
+ /** Sending Multiple messages with same requestId and different subRequestId */
+ spyStreamingRemoteExecutionService.send(invocationId, request)
+ }
+ responseFlowsDeferred.awaitAll()
+ streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
+ }
+
+ }
+
+ private fun getRequest(requestId: String): ExecutionServiceInput {
+ val commonHeader = CommonHeader.newBuilder()
+ .setTimestamp("2012-04-23T18:25:43.511Z")
+ .setOriginatorId("System")
+ .setRequestId(requestId)
+ .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build()
+
+
+ val actionIdentifier = ActionIdentifiers.newBuilder()
+ .setActionName("SampleScript")
+ .setBlueprintName("sample-cba")
+ .setBlueprintVersion("1.0.0")
+ .build()
+
+ return ExecutionServiceInput.newBuilder()
+ .setCommonHeader(commonHeader)
+ .setActionIdentifiers(actionIdentifier)
+ //.setPayload(payloadBuilder.build())
+ .build()
+
+ }
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
index 703a52642..afe10b39d 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml
index ac123cbfd..37a071280 100755
--- a/ms/blueprintsprocessor/parent/pom.xml
+++ b/ms/blueprintsprocessor/parent/pom.xml
@@ -386,11 +386,13 @@
<version>${project.version}</version>
</dependency>
+ <!--
<dependency>
<groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId>
<artifactId>health-api</artifactId>
<version>${project.version}</version>
</dependency>
+ -->
<!-- North Bound -->
<dependency>
diff --git a/ms/py-executor/blueprints_grpc/blueprint_processing_server.py b/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
index 0c432d4cd..f1f29a035 100644
--- a/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
+++ b/ms/py-executor/blueprints_grpc/blueprint_processing_server.py
@@ -46,4 +46,4 @@ class BluePrintProcessingServer(BluePrintProcessing_pb2_grpc.BluePrintProcessing
# Get the Dynamic Process Instance based on request
instance: AbstractScriptFunction = instance_for_input(self.configuration, request)
instance.set_context(context)
- return instance.process(request)
+ yield from instance.process(request)
diff --git a/ms/py-executor/blueprints_grpc/executor_utils.py b/ms/py-executor/blueprints_grpc/executor_utils.py
index 224e35ef9..44b6d8e87 100644
--- a/ms/py-executor/blueprints_grpc/executor_utils.py
+++ b/ms/py-executor/blueprints_grpc/executor_utils.py
@@ -29,7 +29,7 @@ logger = logging.getLogger("Utils")
def current_time():
ts = time.time()
- return datetime.datetime.fromtimestamp(ts).strftime("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ return datetime.datetime.fromtimestamp(ts).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
def blueprint_id(input: ExecutionServiceInput):
diff --git a/ms/py-executor/dc/docker-compose.yaml b/ms/py-executor/dc/docker-compose.yaml
new file mode 100755
index 000000000..76009411b
--- /dev/null
+++ b/ms/py-executor/dc/docker-compose.yaml
@@ -0,0 +1,23 @@
+version: '3.3'
+
+services:
+ py-executor:
+ image: onap/ccsdk-py-executor
+ container_name: bp-py-executor
+ ports:
+ - "50052:50052"
+ restart: always
+ volumes:
+ - blueprints-deploy:/opt/app/onap/blueprints/deploy
+ environment:
+ APPLICATIONNAME: PythonExecutor
+ BUNDLEVERSION: 1.0.0
+ APP_CONFIG_HOME: /opt/app/onap/config
+ STICKYSELECTORKEY:
+ ENVCONTEXT: dev
+ APP_PORT: 50052
+ BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==
+ LOG_FILE: /opt/app/onap/logs/application.log
+
+volumes:
+ blueprints-deploy:
diff --git a/ms/py-executor/docker/Dockerfile b/ms/py-executor/docker/Dockerfile
index 112180581..b49daf68f 100644
--- a/ms/py-executor/docker/Dockerfile
+++ b/ms/py-executor/docker/Dockerfile
@@ -11,8 +11,8 @@ RUN tar -xzf /source.tar.gz -C /tmp \
&& rm -rf /source.tar.gz \
&& rm -rf /tmp/@project.build.finalName@
-RUN pip install --no-cache-dir -r /opt/app/onap/requirements.txt
+RUN pip install --no-cache-dir -r /opt/app/onap/python/requirements.txt
VOLUME /opt/app/onap/blueprints/deploy/
-ENTRYPOINT /opt/app/onap/start.sh \ No newline at end of file
+ENTRYPOINT /opt/app/onap/python/start.sh \ No newline at end of file
diff --git a/ms/py-executor/docker/distribution.xml b/ms/py-executor/docker/distribution.xml
index 8bd06c88f..6235a7b8a 100755
--- a/ms/py-executor/docker/distribution.xml
+++ b/ms/py-executor/docker/distribution.xml
@@ -34,20 +34,22 @@
</fileSet>
<fileSet>
<directory>${project.basedir}</directory>
- <outputDirectory>opt/app/onap</outputDirectory>
+ <outputDirectory>opt/app/onap/python</outputDirectory>
<includes>
<include>requirements.txt</include>
<include>configuration.ini</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
+ <fileMode>0666</fileMode>
</fileSet>
<fileSet>
<directory>${project.basedir}/docker</directory>
- <outputDirectory>opt/app/onap</outputDirectory>
+ <outputDirectory>opt/app/onap/python</outputDirectory>
<includes>
<include>*.sh</include>
</includes>
<useDefaultExcludes>true</useDefaultExcludes>
+ <fileMode>0755</fileMode>
</fileSet>
</fileSets>
</assembly> \ No newline at end of file
diff --git a/ms/py-executor/requirements.txt b/ms/py-executor/requirements.txt
index 6051e04c1..b55636918 100644
--- a/ms/py-executor/requirements.txt
+++ b/ms/py-executor/requirements.txt
@@ -2,5 +2,4 @@ grpcio==1.23.0
grpcio-tools==1.23.0
configparser==4.0.2
requests==2.22.0
-ncclient==0.6.6
-ciscoconfparse==1.4.7 \ No newline at end of file
+ncclient==0.6.6 \ No newline at end of file