diff options
Diffstat (limited to 'ms/blueprintsprocessor')
24 files changed, 721 insertions, 34 deletions
diff --git a/ms/blueprintsprocessor/application/pom.xml b/ms/blueprintsprocessor/application/pom.xml index cd13c9cbb..ed1b67dfd 100755 --- a/ms/blueprintsprocessor/application/pom.xml +++ b/ms/blueprintsprocessor/application/pom.xml @@ -85,10 +85,12 @@ <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>configs-api</artifactId> </dependency> + <!-- <dependency> <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>health-api</artifactId> </dependency> + --> <!-- Functions --> <dependency> diff --git a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml index 407aa6b20..e4bb00773 100755 --- a/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml +++ b/ms/blueprintsprocessor/application/src/main/dc/docker-compose.yaml @@ -41,6 +41,25 @@ services: restart: always volumes: - blueprints-deploy:/opt/app/onap/blueprints/deploy + py-executor: + depends_on: + - db + image: onap/ccsdk-py-script-executor + container_name: bp-py-executor + ports: + - "50052:50052" + restart: always + volumes: + - blueprints-deploy:/opt/app/onap/blueprints/deploy + environment: + APPLICATIONNAME: PythonExecutor + BUNDLEVERSION: 1.0.0 + APP_CONFIG_HOME: /opt/app/onap/config + STICKYSELECTORKEY: + ENVCONTEXT: dev + APP_PORT: 50052 + BASIC_AUTH: Basic Y2NzZGthcHBzOmNjc2RrYXBwcw== + LOG_FILE: /opt/app/onap/logs/application.log volumes: blueprints-deploy: diff --git a/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt new file mode 100644 index 000000000..5ed5ff450 --- /dev/null +++ b/ms/blueprintsprocessor/application/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/LoggingWebFilter.kt @@ -0,0 +1,39 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor + +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.LoggingService +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext +import org.springframework.stereotype.Component +import org.springframework.web.server.ServerWebExchange +import org.springframework.web.server.WebFilter +import org.springframework.web.server.WebFilterChain +import reactor.core.publisher.Mono +import reactor.util.context.Context + +@Component +open class LoggingWebFilter : WebFilter { + override fun filter(serverWebExchange: ServerWebExchange, webFilterChain: WebFilterChain): Mono<Void> { + + val loggingService = LoggingService() + loggingService.entering(serverWebExchange.request) + val filterChain = webFilterChain.filter(serverWebExchange).subscriberContext( + Context.of(MDCContext, MDCContext())) + loggingService.exiting(serverWebExchange.request, serverWebExchange.response) + return filterChain + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties index e40ccba09..faabb80e7 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application-dev.properties @@ -36,7 +36,7 @@ blueprintsprocessor.blueprintArchivePath=blueprints/archive blueprintsprocessor.blueprintWorkingPath=blueprints/work # Controller Blueprint Load Configurations # blueprints.load.initial-data may be overridden by ENV variables -blueprintsprocessor.loadInitialData=true +blueprintsprocessor.loadInitialData=false blueprintsprocessor.loadBluePrint=false blueprintsprocessor.loadBluePrintPaths=./../../../components/model-catalog/blueprint-model/service-blueprint blueprintsprocessor.loadModelType=true diff --git a/ms/blueprintsprocessor/application/src/main/resources/application.properties b/ms/blueprintsprocessor/application/src/main/resources/application.properties index 1378c62c5..b8f0d2344 100755 --- a/ms/blueprintsprocessor/application/src/main/resources/application.properties +++ b/ms/blueprintsprocessor/application/src/main/resources/application.properties @@ -66,8 +66,8 @@ security.user.password: {bcrypt}$2a$10$duaUzVUVW0YPQCSIbGEkQOXwafZGwQ/b32/Ys4R1i security.user.name: ccsdkapps # Used in Health Check -endpoints.user.name=ccsdkapps -endpoints.user.password=ccsdkapps +#endpoints.user.name=ccsdkapps +#endpoints.user.password=ccsdkapps # Executor Options blueprintsprocessor.resourceResolution.enabled=true diff --git a/ms/blueprintsprocessor/application/src/main/resources/logback.xml b/ms/blueprintsprocessor/application/src/main/resources/logback.xml index 6f917096d..9d2b82f25 100644 --- a/ms/blueprintsprocessor/application/src/main/resources/logback.xml +++ b/ms/blueprintsprocessor/application/src/main/resources/logback.xml @@ -15,11 +15,16 @@ --> <configuration> + + <property name="localPattern" value="%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"/> + <property name="defaultPattern" + value="%date{ISO8601,UTC}|%X{RequestID}|%X{InvocationID}|%thread|%X{ServiceName}|%X{ClientIPAddress}|%logger{50}| %msg%n"/> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> + <pattern>${defaultPattern}</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt index 088533a71..a1d2188ab 100644 --- a/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt +++ b/ms/blueprintsprocessor/modules/commons/grpc-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/grpc/service/BluePrintGrpcLibPropertyService.kt @@ -73,7 +73,7 @@ open class BluePrintGrpcLibPropertyService(private var bluePrintProperties: Blue } } - private fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties): + fun blueprintGrpcClientService(grpcClientProperties: GrpcClientProperties): BluePrintGrpcClientService { when (grpcClientProperties) { is TokenAuthGrpcClientProperties -> { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index ab04054fe..1cd8a2af7 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -36,6 +36,7 @@ open class KafkaMessageConsumerProperties : MessageConsumerProperties() { var clientId: String? = null var topic: String? = null var pollMillSec: Long = 1000 + var pollRecords: Int = -1 } open class KafkaBasicAuthMessageConsumerProperties : KafkaMessageConsumerProperties() diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt index 5a9e61bfd..b5d444a49 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaBasicAuthMessageConsumerService.kt @@ -51,6 +51,10 @@ class KafkaBasicAuthMessageConsumerService( if (messageConsumerProperties.clientId != null) { configProperties[ConsumerConfig.CLIENT_ID_CONFIG] = messageConsumerProperties.clientId!! } + /** To handle Back pressure, Get only configured record for processing */ + if (messageConsumerProperties.pollRecords > 0) { + configProperties[ConsumerConfig.MAX_POLL_RECORDS_CONFIG] = messageConsumerProperties.pollRecords + } // TODO("Security Implementation based on type") /** add or override already set properties */ additionalConfig?.let { configProperties.putAll(it) } @@ -84,6 +88,7 @@ class KafkaBasicAuthMessageConsumerService( kafkaConsumer!!.use { kc -> while (keepGoing) { val consumerRecords = kc.poll(Duration.ofMillis(messageConsumerProperties.pollMillSec)) + log.info("Consumed Records : ${consumerRecords.count()}") runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index 2b84eaa78..f4e85a94b 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -52,6 +52,7 @@ import kotlin.test.assertTrue "blueprintsprocessor.messageconsumer.sample.topic=default-topic", "blueprintsprocessor.messageconsumer.sample.clientId=default-client-id", "blueprintsprocessor.messageconsumer.sample.pollMillSec=10", + "blueprintsprocessor.messageconsumer.sample.pollRecords=1", "blueprintsprocessor.messageproducer.sample.type=kafka-basic-auth", "blueprintsprocessor.messageproducer.sample.bootstrapServers=127.0.0.1:9092", @@ -129,11 +130,11 @@ open class BlueprintMessageConsumerServiceTest { .blueprintMessageProducerService("sample") as KafkaBasicAuthMessageProducerService launch { repeat(5) { - delay(1000) + delay(100) blueprintMessageProducerService.sendMessage("this is my message($it)") } } - delay(10000) + delay(5000) blueprintMessageConsumerService.shutDown() } } diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt new file mode 100644 index 000000000..cdf6ce195 --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/LoggerExtensions.kt @@ -0,0 +1,45 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core + +import kotlinx.coroutines.* +import kotlinx.coroutines.reactor.ReactorContext +import kotlinx.coroutines.reactor.asCoroutineContext +import org.onap.ccsdk.cds.blueprintsprocessor.core.service.MonoMDCCoroutine +import org.onap.ccsdk.cds.controllerblueprints.core.MDCContext +import reactor.core.publisher.Mono +import kotlin.coroutines.CoroutineContext +import kotlin.coroutines.EmptyCoroutineContext + +/** Used in Rest controller API methods to populate MDC context to nested coroutines from reactor web filter context. */ +@UseExperimental(InternalCoroutinesApi::class) +fun <T> monoMdc(context: CoroutineContext = EmptyCoroutineContext, + block: suspend CoroutineScope.() -> T?): Mono<T> = Mono.create { sink -> + + val reactorContext = (context[ReactorContext]?.context?.putAll(sink.currentContext()) + ?: sink.currentContext()).asCoroutineContext() + /** Populate MDC context only if present in Reactor Context */ + val newContext = if (!reactorContext.context.isEmpty + && reactorContext.context.hasKey(MDCContext)) { + val mdcContext = reactorContext.context.get<MDCContext>(MDCContext) + GlobalScope.newCoroutineContext(context + reactorContext + mdcContext) + } else GlobalScope.newCoroutineContext(context + reactorContext) + + val coroutine = MonoMDCCoroutine(newContext, sink) + sink.onDispose(coroutine) + coroutine.start(CoroutineStart.DEFAULT, coroutine, block) +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt new file mode 100644 index 000000000..4da7dcd0e --- /dev/null +++ b/ms/blueprintsprocessor/modules/commons/processor-core/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/core/service/BluePrintProcessorLoggingService.kt @@ -0,0 +1,109 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.core.service + +import kotlinx.coroutines.AbstractCoroutine +import kotlinx.coroutines.InternalCoroutinesApi +import kotlinx.coroutines.handleCoroutineException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.slf4j.MDC +import org.springframework.http.server.reactive.ServerHttpRequest +import org.springframework.http.server.reactive.ServerHttpResponse +import reactor.core.Disposable +import reactor.core.publisher.MonoSink +import java.time.ZoneOffset +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.util.* +import kotlin.coroutines.CoroutineContext + +class LoggingService { + private val log = logger(LoggingService::class) + + companion object { + const val ONAP_REQUEST_ID = "X-ONAP-RequestID" + const val ONAP_INVOCATION_ID = "X-ONAP-InvocationID" + const val ONAP_PARTNER_NAME = "X-ONAP-PartnerName" + } + + fun entering(request: ServerHttpRequest) { + val headers = request.headers + val requestID = defaultToUUID(headers.getFirst(ONAP_REQUEST_ID)) + val invocationID = defaultToUUID(headers.getFirst(ONAP_INVOCATION_ID)) + val partnerName = defaultToEmpty(headers.getFirst(ONAP_PARTNER_NAME)) + MDC.put("InvokeTimestamp", ZonedDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ISO_INSTANT)) + MDC.put("RequestID", requestID) + MDC.put("InvocationID", invocationID) + MDC.put("PartnerName", partnerName) + MDC.put("ClientIPAddress", defaultToEmpty(request.remoteAddress?.address?.hostAddress)) + MDC.put("ServerFQDN", defaultToEmpty(request.remoteAddress?.hostString)) + if (MDC.get("ServiceName") == null || MDC.get("ServiceName").equals("", ignoreCase = true)) { + MDC.put("ServiceName", request.uri.path) + } + } + + fun exiting(request: ServerHttpRequest, response: ServerHttpResponse) { + try { + val reqHeaders = request.headers + val resHeaders = response.headers + resHeaders[ONAP_REQUEST_ID] = MDC.get("RequestID") + resHeaders[ONAP_INVOCATION_ID] = MDC.get("InvocationID") + } catch (e: Exception) { + log.warn("couldn't set response headers", e) + } finally { + MDC.clear() + } + } + + private fun defaultToEmpty(input: Any?): String { + return input?.toString() ?: "" + } + + private fun defaultToUUID(input: String?): String { + return input ?: UUID.randomUUID().toString() + } +} + + +@InternalCoroutinesApi +class MonoMDCCoroutine<in T>( + parentContext: CoroutineContext, + private val sink: MonoSink<T> +) : AbstractCoroutine<T>(parentContext, true), Disposable { + private var disposed = false + + override fun onCompleted(value: T) { + if (!disposed) { + if (value == null) sink.success() else sink.success(value) + } + } + + override fun onCancelled(cause: Throwable, handled: Boolean) { + if (!disposed) { + sink.error(cause) + } else if (!handled) { + handleCoroutineException(context, cause) + } + } + + override fun dispose() { + disposed = true + cancel() + } + + override fun isDisposed(): Boolean = disposed +} diff --git a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt index 4d13486c3..a6bff7051 100644 --- a/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/designer-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/designer/api/BlueprintModelController.kt @@ -19,7 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.designer.api import io.swagger.annotations.ApiOperation import io.swagger.annotations.ApiParam -import kotlinx.coroutines.runBlocking +import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc import org.onap.ccsdk.cds.blueprintsprocessor.db.primary.domain.BlueprintModelSearch import org.onap.ccsdk.cds.blueprintsprocessor.designer.api.handler.BluePrintModelHandler import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException @@ -29,6 +29,7 @@ import org.springframework.http.ResponseEntity import org.springframework.http.codec.multipart.FilePart import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* +import reactor.core.publisher.Mono /** * BlueprintModelController Purpose: Handle controllerBlueprint API request @@ -44,7 +45,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @ResponseBody @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") - fun saveBlueprint(@RequestPart("file") filePart: FilePart): BlueprintModelSearch = runBlocking { + fun saveBlueprint(@RequestPart("file") filePart: FilePart): Mono<BlueprintModelSearch> = monoMdc { bluePrintModelHandler.saveBlueprintModel(filePart) } @@ -67,8 +68,9 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") fun getBlueprintByNameAndVersion(@PathVariable(value = "name") name: String, - @PathVariable(value = "version") version: String): BlueprintModelSearch { - return this.bluePrintModelHandler.getBlueprintModelSearchByNameAndVersion(name, version) + @PathVariable(value = "version") version: String) + : Mono<BlueprintModelSearch> = monoMdc { + bluePrintModelHandler.getBlueprintModelSearchByNameAndVersion(name, version) } @GetMapping("/download/by-name/{name}/version/{version}", produces = [MediaType.APPLICATION_JSON_VALUE]) @@ -76,8 +78,9 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") fun downloadBlueprintByNameAndVersion(@PathVariable(value = "name") name: String, - @PathVariable(value = "version") version: String): ResponseEntity<Resource> { - return this.bluePrintModelHandler.downloadBlueprintModelFileByNameAndVersion(name, version) + @PathVariable(value = "version") version: String) + : Mono<ResponseEntity<Resource>> = monoMdc { + bluePrintModelHandler.downloadBlueprintModelFileByNameAndVersion(name, version) } @GetMapping("/{id}", produces = [MediaType.APPLICATION_JSON_VALUE]) @@ -92,8 +95,8 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @ResponseBody @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") - fun downloadBluePrint(@PathVariable(value = "id") id: String): ResponseEntity<Resource> { - return this.bluePrintModelHandler.downloadBlueprintModelFile(id) + fun downloadBluePrint(@PathVariable(value = "id") id: String): Mono<ResponseEntity<Resource>> = monoMdc { + bluePrintModelHandler.downloadBlueprintModelFile(id) } @PostMapping("/enrich", produces = [MediaType.APPLICATION_JSON_VALUE], consumes = [MediaType @@ -101,7 +104,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @ResponseBody @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") - fun enrichBlueprint(@RequestPart("file") file: FilePart): ResponseEntity<Resource> = runBlocking { + fun enrichBlueprint(@RequestPart("file") file: FilePart): Mono<ResponseEntity<Resource>> = monoMdc { bluePrintModelHandler.enrichBlueprint(file) } @@ -109,7 +112,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint @ResponseBody @Throws(BluePrintException::class) @PreAuthorize("hasRole('USER')") - fun publishBlueprint(@RequestPart("file") file: FilePart): BlueprintModelSearch = runBlocking { + fun publishBlueprint(@RequestPart("file") file: FilePart): Mono<BlueprintModelSearch> = monoMdc { bluePrintModelHandler.publishBlueprint(file) } @@ -128,7 +131,7 @@ open class BlueprintModelController(private val bluePrintModelHandler: BluePrint fun deleteBlueprint(@ApiParam(value = "Name of the CBA.", required = true) @PathVariable(value = "name") name: String, @ApiParam(value = "Version of the CBA.", required = true) - @PathVariable(value = "version") version: String) = runBlocking { + @PathVariable(value = "version") version: String) = monoMdc { bluePrintModelHandler.deleteBlueprintModel(name, version) } } diff --git a/ms/blueprintsprocessor/modules/inbounds/pom.xml b/ms/blueprintsprocessor/modules/inbounds/pom.xml index 1729f2ff1..81ffea176 100644 --- a/ms/blueprintsprocessor/modules/inbounds/pom.xml +++ b/ms/blueprintsprocessor/modules/inbounds/pom.xml @@ -35,7 +35,7 @@ <module>designer-api</module> <module>resource-api</module> <module>selfservice-api</module> - <module>health-api</module> + <!--<module>health-api</module>--> </modules> <dependencies> diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt index 4441d2b4b..f14f61e60 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceController.kt @@ -17,27 +17,29 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api -import com.fasterxml.jackson.databind.JsonNode import io.swagger.annotations.Api import io.swagger.annotations.ApiOperation import io.swagger.annotations.ApiParam -import kotlinx.coroutines.runBlocking import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ACTION_MODE_ASYNC import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.core.monoMdc import org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api.utils.determineHttpStatusCode import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive +import org.onap.ccsdk.cds.controllerblueprints.core.logger import org.springframework.beans.factory.annotation.Autowired import org.springframework.http.MediaType import org.springframework.http.ResponseEntity import org.springframework.security.access.prepost.PreAuthorize import org.springframework.web.bind.annotation.* +import reactor.core.publisher.Mono @RestController @RequestMapping("/api/v1/execution-service") @Api(value = "/api/v1/execution-service", description = "Interaction with CBA.") open class ExecutionServiceController { + val log = logger(ExecutionServiceController::class) @Autowired lateinit var executionServiceHandler: ExecutionServiceHandler @@ -47,7 +49,8 @@ open class ExecutionServiceController { produces = [MediaType.APPLICATION_JSON_VALUE]) @ResponseBody @ApiOperation(value = "Health Check", hidden = true) - fun executionServiceControllerHealthCheck(): JsonNode = runBlocking { + fun executionServiceControllerHealthCheck() = monoMdc { + log.info("Health check success...") "Success".asJsonPrimitive() } @@ -59,12 +62,13 @@ open class ExecutionServiceController { @ResponseBody @PreAuthorize("hasRole('USER')") fun process(@ApiParam(value = "ExecutionServiceInput payload.", required = true) - @RequestBody executionServiceInput: ExecutionServiceInput): ResponseEntity<ExecutionServiceOutput> = - runBlocking { - if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { - throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") - } - val processResult = executionServiceHandler.doProcess(executionServiceInput) - ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) - } + @RequestBody executionServiceInput: ExecutionServiceInput) + : Mono<ResponseEntity<ExecutionServiceOutput>> = monoMdc { + + if (executionServiceInput.actionIdentifiers.mode == ACTION_MODE_ASYNC) { + throw IllegalStateException("Can't process async request through the REST endpoint. Use gRPC for async processing.") + } + val processResult = executionServiceHandler.doProcess(executionServiceInput) + ResponseEntity(processResult, determineHttpStatusCode(processResult.status.code)) + } } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml index 8bee7c91c..f3044c8bb 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/pom.xml +++ b/ms/blueprintsprocessor/modules/services/execution-service/pom.xml @@ -64,5 +64,10 @@ <groupId>org.onap.ccsdk.sli.core</groupId> <artifactId>sli-provider</artifactId> </dependency> + + <dependency> + <groupId>io.grpc</groupId> + <artifactId>grpc-testing</artifactId> + </dependency> </dependencies> </project> diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt index 5163a93ac..bee919249 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/AbstractComponentFunction.kt @@ -167,9 +167,9 @@ abstract class AbstractComponentFunction : BlueprintFunctionNode<ExecutionServic */ fun requestPayloadActionProperty(expression: String?): JsonNode? { val requestExpression = if (expression.isNullOrBlank()) { - "$operationName-request" + "$workflowName-request" } else { - "$operationName-request.$expression" + "$workflowName-request.$expression" } return executionServiceInput.payload.jsonPathParse(".$requestExpression") } diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt index a16c52069..062d370bc 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/ComponentFunctionScriptingService.kt @@ -54,6 +54,7 @@ class ComponentFunctionScriptingService(private val applicationContext: Applicat scriptComponent.operationName = componentFunction.operationName scriptComponent.nodeTemplateName = componentFunction.nodeTemplateName scriptComponent.operationInputs = componentFunction.operationInputs + scriptComponent.executionServiceInput = componentFunction.executionServiceInput scriptComponent.scriptType = scriptType // Populate Instance Properties diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt new file mode 100644 index 000000000..adb1d67d2 --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionService.kt @@ -0,0 +1,205 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution + +import com.fasterxml.jackson.databind.JsonNode +import com.github.marcoferrer.krotoplus.coroutines.client.ClientBidiCallChannel +import com.github.marcoferrer.krotoplus.coroutines.client.clientCallBidiStreaming +import io.grpc.ManagedChannel +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.consumeAsFlow +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcClientService +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintException +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service + +interface StreamingRemoteExecutionService<ReqT, ResT> { + + suspend fun openSubscription(selector: Any, txId: String): Flow<ResT> + + suspend fun sendNonInteractive(selector: Any, txId: String, input: ReqT, timeOutMill: Long): ResT + + suspend fun send(txId: String, input: ReqT) + + suspend fun cancelSubscription(txId: String) + + suspend fun closeChannel(selector: Any) +} + +@Service +@ConditionalOnProperty(prefix = "blueprintsprocessor.streamingRemoteExecution", name = ["enabled"], + havingValue = "true", matchIfMissing = false) +class StreamingRemoteExecutionServiceImpl(private val bluePrintGrpcLibPropertyService: BluePrintGrpcLibPropertyService) + : StreamingRemoteExecutionService<ExecutionServiceInput, ExecutionServiceOutput> { + + private val log = logger(StreamingRemoteExecutionServiceImpl::class) + + private val grpcChannels: MutableMap<String, ManagedChannel> = hashMapOf() + + private val commChannels: MutableMap<String, + ClientBidiCallChannel<ExecutionServiceInput, ExecutionServiceOutput>> = hashMapOf() + + + /** + * Open new channel to send and receive for grpc properties [selector] for [txId], + * Create the only one GRPC channel per host port and reuse for further communication. + * Create request communication channel to send and receive requests and responses. + * We can send multiple request with same txId. + * Consume the flow for responses, + * Client should cancel the subscription for the request Id once no longer response is needed. + * */ + @FlowPreview + override suspend fun openSubscription(selector: Any, txId: String): Flow<ExecutionServiceOutput> { + + if (!commChannels.containsKey(txId)) { + /** Get GRPC Channel*/ + val grpcChannel = grpcChannel(selector) + + /** Get Send and Receive Channel for bidirectional process method*/ + val channels = clientCallBidiStreaming(BluePrintProcessingServiceGrpc.getProcessMethod(), grpcChannel) + commChannels[txId] = channels + } + + val commChannel = commChannels[txId] + ?: throw BluePrintException("failed to create response subscription for transactionId($txId) channel") + + log.info("created subscription for transactionId($txId)") + + return commChannel.responseChannel.consumeAsFlow() + } + + /** + * Send the [input] request, by reusing same GRPC channel and Communication channel + * for the request Id. + */ + override suspend fun send(txId: String, input: ExecutionServiceInput) { + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") + coroutineScope { + launch { + sendChannel.send(input) + log.trace("Message sent for transactionId($txId)") + } + } + } + + /** + * Simplified version of Streaming calls, Use this API only listing for actual response for [input] + * for the GRPC [selector] with execution [timeOutMill]. + * Other state of the request will be skipped. + * The assumption here is you can call this API with same request Id and unique subrequest Id, + * so the correlation is sub request id to receive the response. + */ + @ExperimentalCoroutinesApi + override suspend fun sendNonInteractive(selector: Any, txId: String, input: ExecutionServiceInput, timeOutMill: Long) + : ExecutionServiceOutput { + + var output: ExecutionServiceOutput? = null + val flow = openSubscription(selector, txId) + + /** Send the request */ + val sendChannel = commChannels[txId]?.requestChannel + ?: throw BluePrintException("failed to get transactionId($txId) send channel") + sendChannel.send(input) + + /** Receive the response with timeout */ + withTimeout(timeOutMill) { + flow.collect { + log.trace("Received non-interactive transactionId($txId) response : ${it.status.eventType}") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + output = it + cancelSubscription(txId) + } + } + } + return output!! + } + + /** Cancel the Subscription for the [txId], This closes communication channel **/ + @ExperimentalCoroutinesApi + override suspend fun cancelSubscription(txId: String) { + commChannels[txId]?.let { + if (!it.requestChannel.isClosedForSend) + it.requestChannel.close() + /** If receive channel has to close immediately, once the subscription has cancelled, then enable this */ + //it.responseChannel.cancel(CancellationException("subscription cancelled")) + commChannels.remove(txId) + log.info("closed subscription for transactionId($txId)") + } + } + + /** Close the GRPC channel for the host port poperties [selector]*/ + override suspend fun closeChannel(selector: Any) { + val grpcProperties = grpcProperties(selector) + val selectorName = "${grpcProperties.host}:${grpcProperties.port}" + if (grpcChannels.containsKey(selectorName)) { + grpcChannels[selectorName]!!.shutdownNow() + grpcChannels.remove(selectorName) + log.info("grpc channel($selectorName) shutdown completed") + } + } + + /** Check GRPC channel has been cached and not shutdown, If not re create channel and chache it. */ + private suspend fun grpcChannel(selector: Any): ManagedChannel { + val grpcProperties = grpcProperties(selector) + val selectorName = "${grpcProperties.host}:${grpcProperties.port}" + val isGrpcChannelCached = grpcChannels.containsKey(selectorName) + val grpcChannel = if (isGrpcChannelCached) { + if (grpcChannels[selectorName]!!.isShutdown) { + createGrpcChannel(grpcProperties) + } else { + grpcChannels[selectorName]!! + } + } else { + createGrpcChannel(grpcProperties) + } + grpcChannels[selectorName] = grpcChannel + return grpcChannel + } + + suspend fun createGrpcChannel(grpcProperties: GrpcClientProperties): ManagedChannel { + val grpcClientService: BluePrintGrpcClientService = bluePrintGrpcLibPropertyService + .blueprintGrpcClientService(grpcProperties) + return grpcClientService.channel() + } + + private fun grpcProperties(selector: Any): GrpcClientProperties { + return when (selector) { + is String -> { + bluePrintGrpcLibPropertyService.grpcClientProperties(selector.toString()) + } + is JsonNode -> { + bluePrintGrpcLibPropertyService.grpcClientProperties(selector) + } + is GrpcClientProperties -> { + selector + } + else -> { + throw BluePrintException("couldn't process selector($selector)") + } + } + } +} diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt new file mode 100644 index 000000000..e291aa78e --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/MockBluePrintProcessingServer.kt @@ -0,0 +1,97 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts + +import io.grpc.ServerBuilder +import io.grpc.stub.StreamObserver +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.common.api.Status +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.BluePrintProcessingServiceGrpc +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceOutput + +private val log = logger(MockBluePrintProcessingServer::class) + + +class MockBluePrintProcessingServer : BluePrintProcessingServiceGrpc.BluePrintProcessingServiceImplBase() { + + override fun process(responseObserver: StreamObserver<ExecutionServiceOutput>): StreamObserver<ExecutionServiceInput> { + + return object : StreamObserver<ExecutionServiceInput> { + override fun onNext(executionServiceInput: ExecutionServiceInput) { + log.info("Received requestId(${executionServiceInput.commonHeader.requestId}) " + + "subRequestId(${executionServiceInput.commonHeader.subRequestId})") + responseObserver.onNext(buildNotification(executionServiceInput)) + responseObserver.onNext(buildResponse(executionServiceInput)) + responseObserver.onCompleted() + } + + override fun onError(error: Throwable) { + log.debug("Fail to process message", error) + responseObserver.onError(io.grpc.Status.INTERNAL + .withDescription(error.message) + .asException()) + } + + override fun onCompleted() { + log.info("Completed") + } + } + } + + + private fun buildNotification(input: ExecutionServiceInput): ExecutionServiceOutput { + val status = Status.newBuilder() + .setEventType(EventType.EVENT_COMPONENT_NOTIFICATION) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + } + + private fun buildResponse(input: ExecutionServiceInput): ExecutionServiceOutput { + + val status = Status.newBuilder().setCode(200) + .setEventType(EventType.EVENT_COMPONENT_EXECUTED) + .build() + return ExecutionServiceOutput.newBuilder() + .setCommonHeader(input.commonHeader) + .setActionIdentifiers(input.actionIdentifiers) + .setStatus(status) + .build() + + } +} + +/** For Integration testing stat this server */ +fun main() { + try { + val server = ServerBuilder + .forPort(50052) + .addService(MockBluePrintProcessingServer()) + .build() + server.start() + log.info("GRPC Serve started(${server.isShutdown}) on port(${server.port})...") + server.awaitTermination() + } catch (e: Exception) { + e.printStackTrace() + } + +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt new file mode 100644 index 000000000..29d24c6ad --- /dev/null +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/StreamingRemoteExecutionServiceTest.kt @@ -0,0 +1,144 @@ +/* + * Copyright © 2018-2019 AT&T Intellectual Property. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.services.execution + +import io.grpc.inprocess.InProcessChannelBuilder +import io.grpc.inprocess.InProcessServerBuilder +import io.grpc.testing.GrpcCleanupRule +import io.mockk.coEvery +import io.mockk.mockk +import io.mockk.spyk +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.collect +import org.junit.Rule +import org.junit.Test +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.GRPCLibConstants +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.TokenAuthGrpcClientProperties +import org.onap.ccsdk.cds.blueprintsprocessor.grpc.service.BluePrintGrpcLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.scripts.MockBluePrintProcessingServer +import org.onap.ccsdk.cds.controllerblueprints.common.api.ActionIdentifiers +import org.onap.ccsdk.cds.controllerblueprints.common.api.CommonHeader +import org.onap.ccsdk.cds.controllerblueprints.common.api.EventType +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.onap.ccsdk.cds.controllerblueprints.processing.api.ExecutionServiceInput +import java.util.* +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + + +class StreamingRemoteExecutionServiceTest { + + val log = logger(StreamingRemoteExecutionServiceTest::class) + + @get:Rule + val grpcCleanup = GrpcCleanupRule() + private val serverName = InProcessServerBuilder.generateName() + private val serverBuilder = InProcessServerBuilder.forName(serverName).directExecutor() + private val channelBuilder = InProcessChannelBuilder.forName(serverName).directExecutor() + + private val tokenAuthGrpcClientProperties = TokenAuthGrpcClientProperties().apply { + host = "127.0.0.1" + port = 50052 + type = GRPCLibConstants.TYPE_TOKEN_AUTH + token = "Basic Y2NzZGthcHBzOmNjc2RrYXBwcw==" + } + + @Test + @ExperimentalCoroutinesApi + @FlowPreview + fun testStreamingChannel() { + grpcCleanup.register(serverBuilder.addService(MockBluePrintProcessingServer()).build().start()) + val channel = grpcCleanup.register(channelBuilder.maxInboundMessageSize(1024).build()) + + runBlocking { + val bluePrintGrpcLibPropertyService = BluePrintGrpcLibPropertyService(mockk()) + + val streamingRemoteExecutionService = StreamingRemoteExecutionServiceImpl(bluePrintGrpcLibPropertyService) + + val spyStreamingRemoteExecutionService = spyk(streamingRemoteExecutionService) + /** To test with real server, uncomment below line */ + coEvery() { spyStreamingRemoteExecutionService.createGrpcChannel(any()) } returns channel + + /** Test Send and Receive non interactive transaction */ + val nonInteractiveDeferred = arrayListOf<Deferred<*>>() + repeat(2) { count -> + val requestId = "1234-$count" + val request = getRequest(requestId) + val invocationId = request.commonHeader.subRequestId + val deferred = async { + val response = spyStreamingRemoteExecutionService.sendNonInteractive(tokenAuthGrpcClientProperties, + invocationId, request, 1000L) + assertNotNull(response, "failed to get non interactive response") + assertEquals(response.commonHeader.requestId, requestId, + "failed to match non interactive response id") + assertEquals(response.status.eventType, EventType.EVENT_COMPONENT_EXECUTED, + "failed to match non interactive response type") + } + nonInteractiveDeferred.add(deferred) + + } + nonInteractiveDeferred.awaitAll() + + /** Test Send and Receive interactive transaction */ + val responseFlowsDeferred = arrayListOf<Deferred<*>>() + repeat(2) { count -> + val requestId = "12345-$count" + val request = getRequest(requestId) + val invocationId = request.commonHeader.requestId + val responseFlow = spyStreamingRemoteExecutionService + .openSubscription(tokenAuthGrpcClientProperties, invocationId) + + val deferred = async { + responseFlow.collect { + log.info("Received $count-response ($invocationId) : ${it.status.eventType}") + if (it.status.eventType == EventType.EVENT_COMPONENT_EXECUTED) { + spyStreamingRemoteExecutionService.cancelSubscription(invocationId) + } + } + } + responseFlowsDeferred.add(deferred) + /** Sending Multiple messages with same requestId and different subRequestId */ + spyStreamingRemoteExecutionService.send(invocationId, request) + } + responseFlowsDeferred.awaitAll() + streamingRemoteExecutionService.closeChannel(tokenAuthGrpcClientProperties) + } + + } + + private fun getRequest(requestId: String): ExecutionServiceInput { + val commonHeader = CommonHeader.newBuilder() + .setTimestamp("2012-04-23T18:25:43.511Z") + .setOriginatorId("System") + .setRequestId(requestId) + .setSubRequestId("$requestId-" + UUID.randomUUID().toString()).build() + + + val actionIdentifier = ActionIdentifiers.newBuilder() + .setActionName("SampleScript") + .setBlueprintName("sample-cba") + .setBlueprintVersion("1.0.0") + .build() + + return ExecutionServiceInput.newBuilder() + .setCommonHeader(commonHeader) + .setActionIdentifiers(actionIdentifier) + //.setPayload(payloadBuilder.build()) + .build() + + } +}
\ No newline at end of file diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt index 16e4e611b..2fc9a993b 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/services/execution/scripts/AbstractComponentFunctionTest.kt @@ -95,7 +95,7 @@ class AbstractComponentFunctionTest { @Test fun testComponentFunctionPayload() { val sampleComponent = SampleComponent() - sampleComponent.operationName = "sample-action" + sampleComponent.workflowName = "sample-action" sampleComponent.executionServiceInput = JacksonUtils.readValueFromClassPathFile( "payload/requests/sample-execution-request.json", ExecutionServiceInput::class.java)!! val payload = sampleComponent.requestPayload() diff --git a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml index 703a52642..afe10b39d 100644 --- a/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml +++ b/ms/blueprintsprocessor/modules/services/execution-service/src/test/resources/logback-test.xml @@ -19,7 +19,7 @@ <!-- encoders are assigned the type ch.qos.logback.classic.encoder.PatternLayoutEncoder by default --> <encoder> - <pattern>%d{HH:mm:ss.SSS} %-5level %logger{100} - %msg%n</pattern> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern> </encoder> </appender> diff --git a/ms/blueprintsprocessor/parent/pom.xml b/ms/blueprintsprocessor/parent/pom.xml index ac123cbfd..37a071280 100755 --- a/ms/blueprintsprocessor/parent/pom.xml +++ b/ms/blueprintsprocessor/parent/pom.xml @@ -386,11 +386,13 @@ <version>${project.version}</version> </dependency> + <!-- <dependency> <groupId>org.onap.ccsdk.cds.blueprintsprocessor</groupId> <artifactId>health-api</artifactId> <version>${project.version}</version> </dependency> + --> <!-- North Bound --> <dependency> |