summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt1
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt45
-rw-r--r--ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt109
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt25
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt26
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt4
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt1
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt85
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt39
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml2
13 files changed, 287 insertions, 62 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index ab04054fe..1cd8a2af7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() {
var clientId: String? = null
var topic: String? = null
var pollMillSec: Long = 1000
+ var pollRecords: Int = -1
}
open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
index 5a9e61bfd..b5d444a49 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt
@@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService(
if (messageConsumerProperties.clientId != null) {
configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!!
}
+ /** To handle Back pressure, Get only configured record for processing */
+ if (messageConsumerProperties.pollRecords > 0) {
+ configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords
+ }
// TODO("Security Implementation based on type")
/** add or override already set properties */
additionalConfig?.let { configProperties.putAll(it) }
@@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService(
kafkaConsumer!!.use { kc ->
while (keepGoing) {
val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec))
+ log.info("Consumed Records : ${consumerRecords.count()}")
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index 2b84eaa78..f4e85a94b 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -52,6 +52,7 @@ import kotlin.test.assertTrue
"blueprintsprocessor.messageconsumer.sample.topic=default-topic",
"blueprintsprocessor.messageconsumer.sample.clientId=default-client-id",
"blueprintsprocessor.messageconsumer.sample.pollMillSec=10",
+ "blueprintsprocessor.messageconsumer.sample.pollRecords=1",
"blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth",
"blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092",
@@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest {
.blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService
launch {
repeat(5) {
- delay(1000)
+ delay(100)
blueprintMessageProducerService.sendMessage("this is my message($it)")
}
}
- delay(10000)
+ delay(5000)
blueprintMessageConsumerService.shutDown()
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
new file mode 100644
index 000000000..cdf6ce195
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core
+
+import kotlinx.coroutines.*
+import kotlinx.coroutines.reactor.ReactorContext
+import kotlinx.coroutines.reactor.asCoroutineContext
+import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine
+import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext
+import reactor.core.publisher.Mono
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */
+@UseExperimental(InternalCoroutinesApi::class)
+fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext,
+ block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink ->
+
+ val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext())
+ ?: sink.currentContext()).asCoroutineContext()
+ /** Populate MDC context only if present in Reactor Context */
+ val newContext = if (!reactorContext.context.isEmpty
+ && reactorContext.context.hasKey(MDCContext)) {
+ val mdcContext = reactorContext.context.get<MDCContext>(MDCContext)
+ GlobalScope.newCoroutineContext(context + reactorContext + mdcContext)
+ } else GlobalScope.newCoroutineContext(context + reactorContext)
+
+ val coroutine = MonoMDCCoroutine(newContext, sink)
+ sink.onDispose(coroutine)
+ coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
+} \ No newline at end of file
diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
new file mode 100644
index 000000000..4da7dcd0e
--- /dev/null
+++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt
@@ -0,0 +1,109 @@
+/*
+ * Copyright © 2018-2019 AT&T Intellectual Property.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onap.ccsdk.cds.blueprintsprocessor.core.service
+
+import kotlinx.coroutines.AbstractCoroutine
+import kotlinx.coroutines.InternalCoroutinesApi
+import kotlinx.coroutines.handleCoroutineException
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
+import org.slf4j.MDC
+import org.springframework.http.server.reactive.ServerHttpRequest
+import org.springframework.http.server.reactive.ServerHttpResponse
+import reactor.core.Disposable
+import reactor.core.publisher.MonoSink
+import java.time.ZoneOffset
+import java.time.ZonedDateTime
+import java.time.format.DateTimeFormatter
+import java.util.*
+import kotlin.coroutines.CoroutineContext
+
+class LoggingService {
+ private val log = logger(LoggingService::class)
+
+ companion object {
+ const val ONAP_REQUEST_ID = "X-ONAP-RequestID"
+ const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID"
+ const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName"
+ }
+
+ fun entering(request: ServerHttpRequest) {
+ val headers = request.headers
+ val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID))
+ val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID))
+ val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME))
+ MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT))
+ MDC.put("RequestID", requestID)
+ MDC.put("InvocationID", invocationID)
+ MDC.put("PartnerName", partnerName)
+ MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress))
+ MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString))
+ if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) {
+ MDC.put("ServiceName", request.uri.path)
+ }
+ }
+
+ fun exiting(request: ServerHttpRequest, response: ServerHttpResponse) {
+ try {
+ val reqHeaders = request.headers
+ val resHeaders = response.headers
+ resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID")
+ resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID")
+ } catch (e: Exception) {
+ log.warn("couldn't set response headers", e)
+ } finally {
+ MDC.clear()
+ }
+ }
+
+ private fun defaultToEmpty(input: Any?): String {
+ return input?.toString() ?: ""
+ }
+
+ private fun defaultToUUID(input: String?): String {
+ return input ?: UUID.randomUUID().toString()
+ }
+}
+
+
+@InternalCoroutinesApi
+class MonoMDCCoroutine<in T>(
+ parentContext: CoroutineContext,
+ private val sink: MonoSink<T>
+) : AbstractCoroutine<T>(parentContext, true), Disposable {
+ private var disposed = false
+
+ override fun onCompleted(value: T) {
+ if (!disposed) {
+ if (value == null) sink.success() else sink.success(value)
+ }
+ }
+
+ override fun onCancelled(cause: Throwable, handled: Boolean) {
+ if (!disposed) {
+ sink.error(cause)
+ } else if (!handled) {
+ handleCoroutineException(context, cause)
+ }
+ }
+
+ override fun dispose() {
+ disposed = true
+ cancel()
+ }
+
+ override fun isDisposed(): Boolean = disposed
+}
diff --git a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
index 4d13486c3..a6bff7051 100644
--- a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt
@@ -19,7 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.designer.api
import io.swagger.annotations.ApiOperation
import io.swagger.annotations.ApiParam
-import kotlinx.coroutines.runBlocking
+import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelSearch
import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.handler.BluePrintModelHandler
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
@@ -29,6 +29,7 @@ import org.springframework.http.ResponseEntity
import org.springframework.http.codec.multipart.FilePart
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.web.bind.annotation.*
+import reactor.core.publisher.Mono
/**
* BlueprintModelController Purpose: Handle controllerBlueprint API request
@@ -44,7 +45,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@ResponseBody
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
- fun saveBlueprint(@RequestPart("file") filePart: FilePart): BlueprintModelSearch = runBlocking {
+ fun saveBlueprint(@RequestPart("file") filePart: FilePart): Mono<BlueprintModelSearch> = monoMdc {
bluePrintModelHandler.saveBlueprintModel(filePart)
}
@@ -67,8 +68,9 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
fun getBlueprintByNameAndVersion(@PathVariable(value = "name") name: String,
- @PathVariable(value = "version") version: String): BlueprintModelSearch {
- return this.bluePrintModelHandler.getBlueprintModelSearchByNameAndVersion(name, version)
+ @PathVariable(value = "version") version: String)
+ : Mono<BlueprintModelSearch> = monoMdc {
+ bluePrintModelHandler.getBlueprintModelSearchByNameAndVersion(name, version)
}
@GetMapping("/download/by-name/{name}/version/{version}", produces = [MediaType.APPLICATION_JSON_VALUE])
@@ -76,8 +78,9 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
fun downloadBlueprintByNameAndVersion(@PathVariable(value = "name") name: String,
- @PathVariable(value = "version") version: String): ResponseEntity<Resource> {
- return this.bluePrintModelHandler.downloadBlueprintModelFileByNameAndVersion(name, version)
+ @PathVariable(value = "version") version: String)
+ : Mono<ResponseEntity<Resource>> = monoMdc {
+ bluePrintModelHandler.downloadBlueprintModelFileByNameAndVersion(name, version)
}
@GetMapping("/{id}", produces = [MediaType.APPLICATION_JSON_VALUE])
@@ -92,8 +95,8 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@ResponseBody
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
- fun downloadBluePrint(@PathVariable(value = "id") id: String): ResponseEntity<Resource> {
- return this.bluePrintModelHandler.downloadBlueprintModelFile(id)
+ fun downloadBluePrint(@PathVariable(value = "id") id: String): Mono<ResponseEntity<Resource>> = monoMdc {
+ bluePrintModelHandler.downloadBlueprintModelFile(id)
}
@PostMapping("/enrich", produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType
@@ -101,7 +104,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@ResponseBody
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
- fun enrichBlueprint(@RequestPart("file") file: FilePart): ResponseEntity<Resource> = runBlocking {
+ fun enrichBlueprint(@RequestPart("file") file: FilePart): Mono<ResponseEntity<Resource>> = monoMdc {
bluePrintModelHandler.enrichBlueprint(file)
}
@@ -109,7 +112,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
@ResponseBody
@Throws(BluePrintException::class)
@PreAuthorize("hasRole('USER')")
- fun publishBlueprint(@RequestPart("file") file: FilePart): BlueprintModelSearch = runBlocking {
+ fun publishBlueprint(@RequestPart("file") file: FilePart): Mono<BlueprintModelSearch> = monoMdc {
bluePrintModelHandler.publishBlueprint(file)
}
@@ -128,7 +131,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint
fun deleteBlueprint(@ApiParam(value = "Name of the CBA.", required = true)
@PathVariable(value = "name") name: String,
@ApiParam(value = "Version of the CBA.", required = true)
- @PathVariable(value = "version") version: String) = runBlocking {
+ @PathVariable(value = "version") version: String) = monoMdc {
bluePrintModelHandler.deleteBlueprintModel(name, version)
}
}
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
index 4441d2b4b..f14f61e60 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt
@@ -17,27 +17,29 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
-import com.fasterxml.jackson.databind.JsonNode
import io.swagger.annotations.Api
import io.swagger.annotations.ApiOperation
import io.swagger.annotations.ApiParam
-import kotlinx.coroutines.runBlocking
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc
import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
+import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.http.MediaType
import org.springframework.http.ResponseEntity
import org.springframework.security.access.prepost.PreAuthorize
import org.springframework.web.bind.annotation.*
+import reactor.core.publisher.Mono
@RestController
@RequestMapping("/api/v1/execution-service")
@Api(value = "/api/v1/execution-service",
description = "Interaction with CBA.")
open class ExecutionServiceController {
+ val log = logger(ExecutionServiceController::class)
@Autowired
lateinit var executionServiceHandler: ExecutionServiceHandler
@@ -47,7 +49,8 @@ open class ExecutionServiceController {
produces = [MediaType.APPLICATION_JSON_VALUE])
@ResponseBody
@ApiOperation(value = "Health Check", hidden = true)
- fun executionServiceControllerHealthCheck(): JsonNode = runBlocking {
+ fun executionServiceControllerHealthCheck() = monoMdc {
+ log.info("Health check success...")
"Success".asJsonPrimitive()
}
@@ -59,12 +62,13 @@ open class ExecutionServiceController {
@ResponseBody
@PreAuthorize("hasRole('USER')")
fun process(@ApiParam(value = "ExecutionServiceInput payload.", required = true)
- @RequestBody executionServiceInput: ExecutionServiceInput): ResponseEntity<ExecutionServiceOutput> =
- runBlocking {
- if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
- throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
- }
- val processResult = executionServiceHandler.doProcess(executionServiceInput)
- ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code))
- }
+ @RequestBody executionServiceInput: ExecutionServiceInput)
+ : Mono<ResponseEntity<ExecutionServiceOutput>> = monoMdc {
+
+ if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) {
+ throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.")
+ }
+ val processResult = executionServiceHandler.doProcess(executionServiceInput)
+ ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code))
+ }
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
index 5163a93ac..bee919249 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt
@@ -167,9 +167,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic
*/
fun requestPayloadActionProperty(expression: String?): JsonNode? {
val requestExpression = if (expression.isNullOrBlank()) {
- "$operationName-request"
+ "$workflowName-request"
} else {
- "$operationName-request.$expression"
+ "$workflowName-request.$expression"
}
return executionServiceInput.payload.jsonPathParse(".$requestExpression")
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
index a16c52069..062d370bc 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt
@@ -54,6 +54,7 @@ class ComponentFunctionScriptingService(private val applicationContext: Applicat
scriptComponent.operationName = componentFunction.operationName
scriptComponent.nodeTemplateName = componentFunction.nodeTemplateName
scriptComponent.operationInputs = componentFunction.operationInputs
+ scriptComponent.executionServiceInput = componentFunction.executionServiceInput
scriptComponent.scriptType = scriptType
// Populate Instance Properties
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
index 07848164e..adb1d67d2 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt
@@ -20,15 +20,14 @@ import com.fasterxml.jackson.databind.JsonNode
import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel
import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming
import io.grpc.ManagedChannel
-import kotlinx.coroutines.ExperimentalCoroutinesApi
-import kotlinx.coroutines.FlowPreview
-import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
+import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.consumeAsFlow
-import kotlinx.coroutines.launch
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService
import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService
+import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc
@@ -39,11 +38,13 @@ import org.springframework.stereotype.Service
interface StreamingRemoteExecutionService<ReqT, ResT> {
- suspend fun openSubscription(selector: Any, requestId: String): Flow<ResT>
+ suspend fun openSubscription(selector: Any, txId: String): Flow<ResT>
- suspend fun send(input: ReqT)
+ suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT
- suspend fun cancelSubscription(requestId: String)
+ suspend fun send(txId: String, input: ReqT)
+
+ suspend fun cancelSubscription(txId: String)
suspend fun closeChannel(selector: Any)
}
@@ -63,60 +64,90 @@ class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertySe
/**
- * Open new channel to send and receive for grpc properties [selector] for [requestId],
+ * Open new channel to send and receive for grpc properties [selector] for [txId],
* Create the only one GRPC channel per host port and reuse for further communication.
* Create request communication channel to send and receive requests and responses.
- * We can send multiple request with same requestId with unique subRequestId.
+ * We can send multiple request with same txId.
* Consume the flow for responses,
* Client should cancel the subscription for the request Id once no longer response is needed.
* */
@FlowPreview
- override suspend fun openSubscription(selector: Any, requestId: String): Flow<ExecutionServiceOutput> {
+ override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> {
- if (!commChannels.containsKey(requestId)) {
+ if (!commChannels.containsKey(txId)) {
/** Get GRPC Channel*/
val grpcChannel = grpcChannel(selector)
/** Get Send and Receive Channel for bidirectional process method*/
val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel)
- commChannels[requestId] = channels
+ commChannels[txId] = channels
}
- val commChannel = commChannels[requestId]
- ?: throw BluePrintException("failed to create response subscription for request($requestId) channel")
+ val commChannel = commChannels[txId]
+ ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel")
- log.info("created subscription for request($requestId)")
+ log.info("created subscription for transactionId($txId)")
return commChannel.responseChannel.consumeAsFlow()
}
/**
- * Send the [input]request, by reusing same GRPC channel and Communication channel
+ * Send the [input] request, by reusing same GRPC channel and Communication channel
* for the request Id.
*/
- override suspend fun send(input: ExecutionServiceInput) {
- val requestId = input.commonHeader.requestId
- val subRequestId = input.commonHeader.subRequestId
- val sendChannel = commChannels[requestId]?.requestChannel
- ?: throw BluePrintException("failed to get request($requestId) send channel")
+ override suspend fun send(txId: String, input: ExecutionServiceInput) {
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
coroutineScope {
launch {
sendChannel.send(input)
- log.trace("Message sent for request($requestId) : subRequest($subRequestId)")
+ log.trace("Message sent for transactionId($txId)")
+ }
+ }
+ }
+
+ /**
+ * Simplified version of Streaming calls, Use this API only listing for actual response for [input]
+ * for the GRPC [selector] with execution [timeOutMill].
+ * Other state of the request will be skipped.
+ * The assumption here is you can call this API with same request Id and unique subrequest Id,
+ * so the correlation is sub request id to receive the response.
+ */
+ @ExperimentalCoroutinesApi
+ override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long)
+ : ExecutionServiceOutput {
+
+ var output: ExecutionServiceOutput? = null
+ val flow = openSubscription(selector, txId)
+
+ /** Send the request */
+ val sendChannel = commChannels[txId]?.requestChannel
+ ?: throw BluePrintException("failed to get transactionId($txId) send channel")
+ sendChannel.send(input)
+
+ /** Receive the response with timeout */
+ withTimeout(timeOutMill) {
+ flow.collect {
+ log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}")
+ if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
+ output = it
+ cancelSubscription(txId)
+ }
}
}
+ return output!!
}
- /** Cancel the Subscription for the [requestId], This closes communication channel **/
+ /** Cancel the Subscription for the [txId], This closes communication channel **/
@ExperimentalCoroutinesApi
- override suspend fun cancelSubscription(requestId: String) {
- commChannels[requestId]?.let {
+ override suspend fun cancelSubscription(txId: String) {
+ commChannels[txId]?.let {
if (!it.requestChannel.isClosedForSend)
it.requestChannel.close()
/** If receive channel has to close immediately, once the subscription has cancelled, then enable this */
//it.responseChannel.cancel(CancellationException("subscription cancelled"))
- commChannels.remove(requestId)
- log.info("closed subscription for request($requestId)")
+ commChannels.remove(txId)
+ log.info("closed subscription for transactionId($txId)")
}
}
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
index c9ff23573..29d24c6ad 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt
@@ -36,6 +36,8 @@ import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType
import org.onap.ccsdk.cds.controllerblueprints.core.logger
import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput
import java.util.*
+import kotlin.test.assertEquals
+import kotlin.test.assertNotNull
class StreamingRemoteExecutionServiceTest {
@@ -69,25 +71,48 @@ class StreamingRemoteExecutionServiceTest {
val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService)
/** To test with real server, uncomment below line */
- coEvery { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+ coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel
+
+ /** Test Send and Receive non interactive transaction */
+ val nonInteractiveDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
+ val requestId = "1234-$count"
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.subRequestId
+ val deferred = async {
+ val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties,
+ invocationId, request, 1000L)
+ assertNotNull(response, "failed to get non interactive response")
+ assertEquals(response.commonHeader.requestId, requestId,
+ "failed to match non interactive response id")
+ assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED,
+ "failed to match non interactive response type")
+ }
+ nonInteractiveDeferred.add(deferred)
- val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ }
+ nonInteractiveDeferred.awaitAll()
- repeat(1) { count ->
+ /** Test Send and Receive interactive transaction */
+ val responseFlowsDeferred = arrayListOf<Deferred<*>>()
+ repeat(2) { count ->
val requestId = "12345-$count"
- val responseFlow = spyStreamingRemoteExecutionService.openSubscription(tokenAuthGrpcClientProperties, requestId)
+ val request = getRequest(requestId)
+ val invocationId = request.commonHeader.requestId
+ val responseFlow = spyStreamingRemoteExecutionService
+ .openSubscription(tokenAuthGrpcClientProperties, invocationId)
val deferred = async {
responseFlow.collect {
- log.info("Received $count-response (${it.commonHeader.subRequestId}) : ${it.status.eventType}")
+ log.info("Received $count-response ($invocationId) : ${it.status.eventType}")
if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) {
- spyStreamingRemoteExecutionService.cancelSubscription(it.commonHeader.requestId)
+ spyStreamingRemoteExecutionService.cancelSubscription(invocationId)
}
}
}
responseFlowsDeferred.add(deferred)
/** Sending Multiple messages with same requestId and different subRequestId */
- spyStreamingRemoteExecutionService.send(getRequest(requestId))
+ spyStreamingRemoteExecutionService.send(invocationId, request)
}
responseFlowsDeferred.awaitAll()
streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties)
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
index 16e4e611b..2fc9a993b 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt
@@ -95,7 +95,7 @@ class AbstractComponentFunctionTest {
@Test
fun testComponentFunctionPayload() {
val sampleComponent = SampleComponent()
- sampleComponent.operationName = "sample-action"
+ sampleComponent.workflowName = "sample-action"
sampleComponent.executionServiceInput = JacksonUtils.readValueFromClassPathFile(
"payload/requests/sample-execution-request.json", ExecutionServiceInput::class.java)!!
val payload = sampleComponent.requestPayload()
diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
index 703a52642..afe10b39d 100644
--- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
+++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml
@@ -19,7 +19,7 @@
<!-- encoders are assigned the type
ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
<encoder>
- <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>