diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-03-23 14:33:40 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-04-08 19:02:14 -0400 |
commit | e9dd04c53d8aff175909f04f33acae506800e25f (patch) | |
tree | efb63e23c39cfc14e2ce9f904d2905f24a7b8636 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src | |
parent | 54974ab95489cbe6a6e13078459f68d982601d47 (diff) |
Publish execution input/output into Kafka topics
- Creation and implementation of PublishAudit services (NoAudit,Kafka)
- Conceal of sensitive data (request input parameters)
- Update ExecutionServiceHandler tests
- Update application.properties file
Issue-ID: CCSDK-2183
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Ifc28a91ce588f5ed0010acb4ed22b64429c6c1a0
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src')
7 files changed, 349 insertions, 46 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index d0b4df888..49f2a50d5 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -51,15 +51,13 @@ open class BluePrintProcessingKafkaConsumer( companion object { const val CONSUMER_SELECTOR = "self-service-api" - const val PRODUCER_SELECTOR = "self-service-api" } @EventListener(ApplicationReadyEvent::class) fun setupMessageListener() = runBlocking { try { log.info( - "Setting up message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR)..." + "Setting up message consumer($CONSUMER_SELECTOR)" ) /** Get the Message Consumer Service **/ @@ -74,18 +72,6 @@ open class BluePrintProcessingKafkaConsumer( throw BluePrintProcessorException("failed to create consumer service ${e.message}") } - /** Get the Message Producer Service **/ - val blueprintMessageProducerService = try { - bluePrintMessageLibPropertyService - .blueprintMessageProducerService(PRODUCER_SELECTOR) - } catch (e: BluePrintProcessorException) { - val errorMsg = "Failed creating Kafka producer message service." - throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, - "Wrong Kafka selector provided or internal error in Kafka service.") - } catch (e: Exception) { - throw BluePrintProcessorException("failed to create producer service ${e.message}") - } - launch { /** Subscribe to the consumer topics */ val additionalConfig: MutableMap<String, Any> = hashMapOf() @@ -96,10 +82,7 @@ open class BluePrintProcessingKafkaConsumer( ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - // TODO("In future, Message publisher configuration vary with respect to request") - /** Send the response message */ - blueprintMessageProducerService.sendMessage(executionServiceOutput) + executionServiceHandler.doProcess(executionServiceInput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { @@ -110,8 +93,7 @@ open class BluePrintProcessingKafkaConsumer( } } catch (e: Exception) { log.error( - "failed to start message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR) ", e + "failed to start message consumer($CONSUMER_SELECTOR) ", e ) } } @@ -120,8 +102,7 @@ open class BluePrintProcessingKafkaConsumer( fun shutdownMessageListener() = runBlocking { try { log.info( - "Shutting down message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR)..." + "Shutting down message consumer($CONSUMER_SELECTOR)" ) blueprintMessageConsumerService.shutDown() ph.arriveAndAwaitAdvance() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 9524e375e..74c4b00e4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -44,7 +44,8 @@ class ExecutionServiceHandler( private val bluePrintLoadConfiguration: BluePrintLoadConfiguration, private val blueprintsProcessorCatalogService: BluePrintCatalogService, private val bluePrintWorkflowExecutionService: - BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> + BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>, + private val publishAuditService: PublishAuditService ) { private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString()) @@ -67,33 +68,44 @@ class ExecutionServiceHandler( responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() } - else -> responseObserver.onNext( - response( - executionServiceInput, - "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", - true - ).toProto() - ) + else -> { + publishAuditService.publish(executionServiceInput) + val executionServiceOutput = response( + executionServiceInput, + "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", + true + ) + publishAuditService.publish(executionServiceOutput) + responseObserver.onNext( + executionServiceOutput.toProto() + ) + } } } suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { val requestId = executionServiceInput.commonHeader.requestId - log.info("processing request id $requestId") val actionIdentifiers = executionServiceInput.actionIdentifiers val blueprintName = actionIdentifiers.blueprintName val blueprintVersion = actionIdentifiers.blueprintVersion + + lateinit var executionServiceOutput: ExecutionServiceOutput + + log.info("processing request id $requestId") + try { + publishAuditService.publish(executionServiceInput) + /** Check Blueprint is needed for this request */ if (checkServiceFunction(executionServiceInput)) { - return executeServiceFunction(executionServiceInput) + executionServiceOutput = executeServiceFunction(executionServiceInput) } else { val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) log.info("blueprint base path $basePath") val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow( + executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow( blueprintRuntimeService, executionServiceInput, hashMapOf() ) @@ -101,14 +113,16 @@ class ExecutionServiceHandler( val errors = blueprintRuntimeService.getBluePrintError().errors if (errors.isNotEmpty()) { val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) - setErrorStatus(errorMessage, output.status) + setErrorStatus(errorMessage, executionServiceOutput.status) } - return output } } catch (e: Exception) { log.error("fail processing request id $requestId", e) - return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) + executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } + + publishAuditService.publish(executionServiceOutput) + return executionServiceOutput } /** If the blueprint name is default, It means no blueprint is needed for the execution */ diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt new file mode 100644 index 000000000..129e7a54d --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -0,0 +1,184 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants +import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService +import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils +import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service +import javax.annotation.PostConstruct + +/** + * Audit service used to produce execution service input and output message + * sent into dedicated kafka topics. + */ +@ConditionalOnProperty( + name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"], + havingValue = "true" +) +@Service +class KafkaPublishAuditService( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val blueprintsProcessorCatalogService: BluePrintCatalogService +) : PublishAuditService { + + private var inputInstance: BlueprintMessageProducerService? = null + private var outputInstance: BlueprintMessageProducerService? = null + + private lateinit var correlationUUID: String + + private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) + + companion object { + const val INPUT_SELECTOR = "self-service-api.audit.request" + const val OUTPUT_SELECTOR = "self-service-api.audit.response" + } + + @PostConstruct + private fun init() { + log.info("Kakfa audit service is enabled") + } + + /** + * Publish execution input into a kafka topic. + * The correlation UUID is used to link the input to its output. + * Sensitive data within the request are hidden. + */ + override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + this.correlationUUID = executionServiceInput.correlationUUID + val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) + this.inputInstance = this.getInputInstance(INPUT_SELECTOR) + this.inputInstance!!.sendMessage(secureExecutionServiceInput) + } + + /** + * Publish execution output into a kafka topic. + * The correlation UUID is used to link the output to its input. + * A correlation UUID is added to link the input to its output. + */ + override fun publish(executionServiceOutput: ExecutionServiceOutput) { + executionServiceOutput.correlationUUID = this.correlationUUID + this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) + this.outputInstance!!.sendMessage(executionServiceOutput) + } + + /** + * Return the input kafka producer instance using a selector. + */ + private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector) + + /** + * Return the output kafka producer instance using a selector. + */ + private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector) + + /** + * Create a kafka producer instance. + */ + private fun createInstance(selector: String): BlueprintMessageProducerService { + log.info( + "Setting up message producer($selector)..." + ) + return try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(selector) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + } + + /** + * Hide sensitive data in the request. + * Sensitive data are declared in the resource resolution mapping using + * the property metadata "log-protect" set to true. + */ + private suspend fun hideSensitiveData( + executionServiceInput: ExecutionServiceInput + ): ExecutionServiceInput { + + var clonedExecutionServiceInput = ExecutionServiceInput().apply { + correlationUUID = executionServiceInput.correlationUUID + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + payload = executionServiceInput.payload + } + + val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName + val workflowName = clonedExecutionServiceInput.actionIdentifiers.actionName + + if (blueprintName == "default") return clonedExecutionServiceInput + + if (clonedExecutionServiceInput.payload + .path("$workflowName-request").has("$workflowName-properties")) { + + /** Retrieving sensitive input parameters */ + val requestId = clonedExecutionServiceInput.commonHeader.requestId + val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion + + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + val blueprintContext = blueprintRuntimeService.bluePrintContext() + + val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName) + val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) + val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + + val propertyAssignments: MutableMap<String, JsonNode> = + blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + ?: hashMapOf() + + val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] + val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java) + + /** Storing mapping entries with metadata log-protect set to true */ + val sensitiveParameters: List<String> = artifactPrefixNames + .map { "$it-mapping" } + .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } + .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } + .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } + .map { it.name } + + /** Hiding sensitive input parameters from the request */ + var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload + .path("$workflowName-request") + .path("$workflowName-properties") as ObjectNode + + sensitiveParameters.forEach { sensitiveParameter -> + if (workflowProperties.has(sensitiveParameter)) { + workflowProperties.remove(sensitiveParameter) + workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED) + } + } + } + + return clonedExecutionServiceInput + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt new file mode 100644 index 000000000..3f782000b --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service +import javax.annotation.PostConstruct + +/** + * Default audit service when no audit publisher is defined, message aren't sent + */ +@ConditionalOnProperty( + name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"], + havingValue = "false" +) +@Service +class NoPublishAuditService : PublishAuditService { + + val log = logger(NoPublishAuditService::class) + + @PostConstruct + fun init() { + log.info("Audit service is disabled") + } + + override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + } + + override fun publish(executionServiceOutput: ExecutionServiceOutput) { + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt new file mode 100644 index 000000000..535a5eae0 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -0,0 +1,25 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput + +interface PublishAuditService { + suspend fun publish(executionServiceInput: ExecutionServiceInput) + fun publish(executionServiceOutput: ExecutionServiceOutput) +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt index b21f968c9..37f7861be 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandlerTest.kt @@ -16,6 +16,11 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api +import io.mockk.verify +import io.mockk.coVerify +import io.mockk.Runs +import io.mockk.coEvery +import io.mockk.just import io.mockk.mockk import kotlinx.coroutines.runBlocking import org.junit.Before @@ -23,6 +28,7 @@ import org.junit.runner.RunWith import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ActionIdentifiers import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.CommonHeader import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput import org.onap.ccsdk.cds.blueprintsprocessor.services.execution.AbstractServiceFunction import org.onap.ccsdk.cds.controllerblueprints.core.jsonAsJsonType import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintDependencyService @@ -65,13 +71,52 @@ class ExecutionServiceHandlerTest { } } runBlocking { - val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk()) + val executionServiceHandler = ExecutionServiceHandler(mockk(), mockk(), mockk(), mockk()) val isServiceFunction = executionServiceHandler.checkServiceFunction(executionServiceInput) assertTrue(isServiceFunction, "failed to checkServiceFunction") val executionServiceOutput = executionServiceHandler.executeServiceFunction(executionServiceInput) assertNotNull(executionServiceOutput, "failed to get executionServiceOutput") } } + + @Test + fun testPublishAuditFunction() { + val executionServiceInput = ExecutionServiceInput().apply { + commonHeader = CommonHeader().apply { + requestId = "1234" + subRequestId = "1234-12" + originatorId = "cds-test" + } + actionIdentifiers = ActionIdentifiers().apply { + blueprintName = "default" + blueprintVersion = "1.0.0" + actionName = "mock-service-action" + } + } + + val publishAuditService = mockk<KafkaPublishAuditService>(relaxed = true) + val executionServiceHandler = ExecutionServiceHandler( + mockk(), + mockk(), + mockk(), + publishAuditService + ) + + coEvery { publishAuditService.publish(ExecutionServiceInput()) } just Runs + + var executionServiceOutput: ExecutionServiceOutput? = null + runBlocking { + executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + } + + coVerify { + publishAuditService.publish(executionServiceInput) + } + + verify { + publishAuditService.publish(executionServiceOutput!!) + } + } } @Service("mock-service-action") diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties index 6003df1df..fb2189ffb 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/test/resources/application-test.properties @@ -37,16 +37,23 @@ error.catalog.errorDefinitionDir=./../../../application/src/test/resources/ blueprints.processor.functions.python.executor.executionPath=./../../../../components/scripts/python/ccsdk_blueprints blueprints.processor.functions.python.executor.modulePaths=./../../../../components/scripts/python/ccsdk_blueprints -# Kafka-message-lib Configuration +# Kafka-message-lib Configurations blueprintsprocessor.messageconsumer.self-service-api.kafkaEnable=false blueprintsprocessor.messageconsumer.self-service-api.type=kafka-basic-auth blueprintsprocessor.messageconsumer.self-service-api.bootstrapServers=127.0.0.1:9092 -blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t blueprintsprocessor.messageconsumer.self-service-api.groupId=receiver-id -blueprintsprocessor.messageconsumer.self-service-api.clientId=default-client-id -blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=10 +blueprintsprocessor.messageconsumer.self-service-api.topic=receiver.t +blueprintsprocessor.messageconsumer.self-service-api.clientId=request-receiver-client-id +blueprintsprocessor.messageconsumer.self-service-api.pollMillSec=1000 + +# Kafka audit service Configurations +blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable=false +blueprintsprocessor.messageproducer.self-service-api.audit.request.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.audit.request.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.audit.request.clientId=audit-request-producer-client-id +blueprintsprocessor.messageproducer.self-service-api.audit.request.topic=audit-request-producer.t -blueprintsprocessor.messageproducer.self-service-api.type=kafka-basic-auth -blueprintsprocessor.messageproducer.self-service-api.bootstrapServers=127.0.0.1:9092 -blueprintsprocessor.messageproducer.self-service-api.clientId=default-client-id -blueprintsprocessor.messageproducer.self-service-api.topic=producer.t +blueprintsprocessor.messageproducer.self-service-api.audit.response.type=kafka-basic-auth +blueprintsprocessor.messageproducer.self-service-api.audit.response.bootstrapServers=127.0.0.1:9092 +blueprintsprocessor.messageproducer.self-service-api.audit.response.clientId=audit-response-producer-client-id +blueprintsprocessor.messageproducer.self-service-api.audit.response.topic=audit-response-producer.t |