diff options
author | Julien Fontaine <julien.fontaine@bell.ca> | 2020-03-23 14:33:40 -0400 |
---|---|---|
committer | Julien Fontaine <julien.fontaine@bell.ca> | 2020-04-08 19:02:14 -0400 |
commit | e9dd04c53d8aff175909f04f33acae506800e25f (patch) | |
tree | efb63e23c39cfc14e2ce9f904d2905f24a7b8636 /ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main | |
parent | 54974ab95489cbe6a6e13078459f68d982601d47 (diff) |
Publish execution input/output into Kafka topics
- Creation and implementation of PublishAudit services (NoAudit,Kafka)
- Conceal of sensitive data (request input parameters)
- Update ExecutionServiceHandler tests
- Update application.properties file
Issue-ID: CCSDK-2183
Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca>
Change-Id: Ifc28a91ce588f5ed0010acb4ed22b64429c6c1a0
Diffstat (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main')
5 files changed, 288 insertions, 37 deletions
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index d0b4df888..49f2a50d5 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -51,15 +51,13 @@ open class BluePrintProcessingKafkaConsumer( companion object { const val CONSUMER_SELECTOR = "self-service-api" - const val PRODUCER_SELECTOR = "self-service-api" } @EventListener(ApplicationReadyEvent::class) fun setupMessageListener() = runBlocking { try { log.info( - "Setting up message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR)..." + "Setting up message consumer($CONSUMER_SELECTOR)" ) /** Get the Message Consumer Service **/ @@ -74,18 +72,6 @@ open class BluePrintProcessingKafkaConsumer( throw BluePrintProcessorException("failed to create consumer service ${e.message}") } - /** Get the Message Producer Service **/ - val blueprintMessageProducerService = try { - bluePrintMessageLibPropertyService - .blueprintMessageProducerService(PRODUCER_SELECTOR) - } catch (e: BluePrintProcessorException) { - val errorMsg = "Failed creating Kafka producer message service." - throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, - "Wrong Kafka selector provided or internal error in Kafka service.") - } catch (e: Exception) { - throw BluePrintProcessorException("failed to create producer service ${e.message}") - } - launch { /** Subscribe to the consumer topics */ val additionalConfig: MutableMap<String, Any> = hashMapOf() @@ -96,10 +82,7 @@ open class BluePrintProcessingKafkaConsumer( ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() - val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - // TODO("In future, Message publisher configuration vary with respect to request") - /** Send the response message */ - blueprintMessageProducerService.sendMessage(executionServiceOutput) + executionServiceHandler.doProcess(executionServiceInput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { @@ -110,8 +93,7 @@ open class BluePrintProcessingKafkaConsumer( } } catch (e: Exception) { log.error( - "failed to start message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR) ", e + "failed to start message consumer($CONSUMER_SELECTOR) ", e ) } } @@ -120,8 +102,7 @@ open class BluePrintProcessingKafkaConsumer( fun shutdownMessageListener() = runBlocking { try { log.info( - "Shutting down message consumer($CONSUMER_SELECTOR) and " + - "message producer($PRODUCER_SELECTOR)..." + "Shutting down message consumer($CONSUMER_SELECTOR)" ) blueprintMessageConsumerService.shutDown() ph.arriveAndAwaitAdvance() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt index 9524e375e..74c4b00e4 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/ExecutionServiceHandler.kt @@ -44,7 +44,8 @@ class ExecutionServiceHandler( private val bluePrintLoadConfiguration: BluePrintLoadConfiguration, private val blueprintsProcessorCatalogService: BluePrintCatalogService, private val bluePrintWorkflowExecutionService: - BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput> + BluePrintWorkflowExecutionService<ExecutionServiceInput, ExecutionServiceOutput>, + private val publishAuditService: PublishAuditService ) { private val log = LoggerFactory.getLogger(ExecutionServiceHandler::class.toString()) @@ -67,33 +68,44 @@ class ExecutionServiceHandler( responseObserver.onNext(executionServiceOutput.toProto()) responseObserver.onCompleted() } - else -> responseObserver.onNext( - response( - executionServiceInput, - "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", - true - ).toProto() - ) + else -> { + publishAuditService.publish(executionServiceInput) + val executionServiceOutput = response( + executionServiceInput, + "Failed to process request, 'actionIdentifiers.mode' not specified. Valid value are: 'sync' or 'async'.", + true + ) + publishAuditService.publish(executionServiceOutput) + responseObserver.onNext( + executionServiceOutput.toProto() + ) + } } } suspend fun doProcess(executionServiceInput: ExecutionServiceInput): ExecutionServiceOutput { val requestId = executionServiceInput.commonHeader.requestId - log.info("processing request id $requestId") val actionIdentifiers = executionServiceInput.actionIdentifiers val blueprintName = actionIdentifiers.blueprintName val blueprintVersion = actionIdentifiers.blueprintVersion + + lateinit var executionServiceOutput: ExecutionServiceOutput + + log.info("processing request id $requestId") + try { + publishAuditService.publish(executionServiceInput) + /** Check Blueprint is needed for this request */ if (checkServiceFunction(executionServiceInput)) { - return executeServiceFunction(executionServiceInput) + executionServiceOutput = executeServiceFunction(executionServiceInput) } else { val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) log.info("blueprint base path $basePath") val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) - val output = bluePrintWorkflowExecutionService.executeBluePrintWorkflow( + executionServiceOutput = bluePrintWorkflowExecutionService.executeBluePrintWorkflow( blueprintRuntimeService, executionServiceInput, hashMapOf() ) @@ -101,14 +113,16 @@ class ExecutionServiceHandler( val errors = blueprintRuntimeService.getBluePrintError().errors if (errors.isNotEmpty()) { val errorMessage = errors.stream().map { it.toString() }.collect(Collectors.joining(", ")) - setErrorStatus(errorMessage, output.status) + setErrorStatus(errorMessage, executionServiceOutput.status) } - return output } } catch (e: Exception) { log.error("fail processing request id $requestId", e) - return response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) + executionServiceOutput = response(executionServiceInput, e.localizedMessage ?: e.message ?: e.toString(), true) } + + publishAuditService.publish(executionServiceOutput) + return executionServiceOutput } /** If the blueprint name is default, It means no blueprint is needed for the execution */ diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt new file mode 100644 index 000000000..129e7a54d --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -0,0 +1,184 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.ResourceResolutionConstants +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService +import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService +import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants +import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService +import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils +import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils +import org.onap.ccsdk.cds.controllerblueprints.core.utils.PropertyDefinitionUtils +import org.onap.ccsdk.cds.controllerblueprints.resource.dict.ResourceAssignment +import org.slf4j.LoggerFactory +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service +import javax.annotation.PostConstruct + +/** + * Audit service used to produce execution service input and output message + * sent into dedicated kafka topics. + */ +@ConditionalOnProperty( + name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"], + havingValue = "true" +) +@Service +class KafkaPublishAuditService( + private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, + private val blueprintsProcessorCatalogService: BluePrintCatalogService +) : PublishAuditService { + + private var inputInstance: BlueprintMessageProducerService? = null + private var outputInstance: BlueprintMessageProducerService? = null + + private lateinit var correlationUUID: String + + private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) + + companion object { + const val INPUT_SELECTOR = "self-service-api.audit.request" + const val OUTPUT_SELECTOR = "self-service-api.audit.response" + } + + @PostConstruct + private fun init() { + log.info("Kakfa audit service is enabled") + } + + /** + * Publish execution input into a kafka topic. + * The correlation UUID is used to link the input to its output. + * Sensitive data within the request are hidden. + */ + override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + this.correlationUUID = executionServiceInput.correlationUUID + val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) + this.inputInstance = this.getInputInstance(INPUT_SELECTOR) + this.inputInstance!!.sendMessage(secureExecutionServiceInput) + } + + /** + * Publish execution output into a kafka topic. + * The correlation UUID is used to link the output to its input. + * A correlation UUID is added to link the input to its output. + */ + override fun publish(executionServiceOutput: ExecutionServiceOutput) { + executionServiceOutput.correlationUUID = this.correlationUUID + this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) + this.outputInstance!!.sendMessage(executionServiceOutput) + } + + /** + * Return the input kafka producer instance using a selector. + */ + private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector) + + /** + * Return the output kafka producer instance using a selector. + */ + private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector) + + /** + * Create a kafka producer instance. + */ + private fun createInstance(selector: String): BlueprintMessageProducerService { + log.info( + "Setting up message producer($selector)..." + ) + return try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(selector) + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + } + + /** + * Hide sensitive data in the request. + * Sensitive data are declared in the resource resolution mapping using + * the property metadata "log-protect" set to true. + */ + private suspend fun hideSensitiveData( + executionServiceInput: ExecutionServiceInput + ): ExecutionServiceInput { + + var clonedExecutionServiceInput = ExecutionServiceInput().apply { + correlationUUID = executionServiceInput.correlationUUID + commonHeader = executionServiceInput.commonHeader + actionIdentifiers = executionServiceInput.actionIdentifiers + payload = executionServiceInput.payload + } + + val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName + val workflowName = clonedExecutionServiceInput.actionIdentifiers.actionName + + if (blueprintName == "default") return clonedExecutionServiceInput + + if (clonedExecutionServiceInput.payload + .path("$workflowName-request").has("$workflowName-properties")) { + + /** Retrieving sensitive input parameters */ + val requestId = clonedExecutionServiceInput.commonHeader.requestId + val blueprintVersion = clonedExecutionServiceInput.actionIdentifiers.blueprintVersion + + val basePath = blueprintsProcessorCatalogService.getFromDatabase(blueprintName, blueprintVersion) + + val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) + val blueprintContext = blueprintRuntimeService.bluePrintContext() + + val nodeTemplateName = blueprintContext.workflowFirstStepNodeTemplate(workflowName) + val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) + val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + + val propertyAssignments: MutableMap<String, JsonNode> = + blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + ?: hashMapOf() + + val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] + val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNode!!, String::class.java) + + /** Storing mapping entries with metadata log-protect set to true */ + val sensitiveParameters: List<String> = artifactPrefixNames + .map { "$it-mapping" } + .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } + .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } + .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } + .map { it.name } + + /** Hiding sensitive input parameters from the request */ + var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload + .path("$workflowName-request") + .path("$workflowName-properties") as ObjectNode + + sensitiveParameters.forEach { sensitiveParameter -> + if (workflowProperties.has(sensitiveParameter)) { + workflowProperties.remove(sensitiveParameter) + workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED) + } + } + } + + return clonedExecutionServiceInput + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt new file mode 100644 index 000000000..3f782000b --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/NoPublishAuditService.kt @@ -0,0 +1,47 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput +import org.onap.ccsdk.cds.controllerblueprints.core.logger +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.stereotype.Service +import javax.annotation.PostConstruct + +/** + * Default audit service when no audit publisher is defined, message aren't sent + */ +@ConditionalOnProperty( + name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"], + havingValue = "false" +) +@Service +class NoPublishAuditService : PublishAuditService { + + val log = logger(NoPublishAuditService::class) + + @PostConstruct + fun init() { + log.info("Audit service is disabled") + } + + override suspend fun publish(executionServiceInput: ExecutionServiceInput) { + } + + override fun publish(executionServiceOutput: ExecutionServiceOutput) { + } +} diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt new file mode 100644 index 000000000..535a5eae0 --- /dev/null +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/PublishAuditService.kt @@ -0,0 +1,25 @@ +/* + * Copyright © 2020 Bell Canada + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api + +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput +import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput + +interface PublishAuditService { + suspend fun publish(executionServiceInput: ExecutionServiceInput) + fun publish(executionServiceOutput: ExecutionServiceOutput) +} |