From 888f2f78a580a3deccb66bef50f2375a04b39eb6 Mon Sep 17 00:00:00 2001 From: Julien Fontaine Date: Mon, 20 Apr 2020 11:53:41 -0400 Subject: Truncate message published on Kafka / Spike: Define solution for logs separation Refactoring of cmd-exec component - Improve display of error messages within the response - Fix gRPC buffer limitation (4Mb) by truncating error messages and cmd-exec logs if too heavy (>3Mb) Truncation of BP responses (<4Kb) before sending them in kafka audit topics. - Truncation if needed of error messages for every response - Truncation of cmd-exec logs in cmd-exec responses (Spike) Add a flag in the application.properties to enable/disable the display of cmd-exec responses on the BP side (Fix) Correction of BP processing with kafka regression (Fix) Changed default SSL Endpoint Algo Issue-ID: CCSDK-2326 Change-Id: If4d0e661117d1dd156cf19c95774824e754d870a Signed-off-by: Julien Fontaine --- .../api/BluePrintProcessingKafkaConsumer.kt | 25 ++++++++++++++++++---- .../selfservice/api/KafkaPublishAuditService.kt | 10 ++++----- 2 files changed, 25 insertions(+), 10 deletions(-) (limited to 'ms/blueprintsprocessor/modules/inbounds/selfservice-api/src') diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 49f2a50d5..a95af8123 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -51,13 +51,15 @@ open class BluePrintProcessingKafkaConsumer( companion object { const val CONSUMER_SELECTOR = "self-service-api" + const val PRODUCER_SELECTOR = "self-service-api" } @EventListener(ApplicationReadyEvent::class) fun setupMessageListener() = runBlocking { try { log.info( - "Setting up message consumer($CONSUMER_SELECTOR)" + "Setting up message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." ) /** Get the Message Consumer Service **/ @@ -72,6 +74,18 @@ open class BluePrintProcessingKafkaConsumer( throw BluePrintProcessorException("failed to create consumer service ${e.message}") } + /** Get the Message Producer Service **/ + val blueprintMessageProducerService = try { + bluePrintMessageLibPropertyService + .blueprintMessageProducerService(PRODUCER_SELECTOR) + } catch (e: BluePrintProcessorException) { + val errorMsg = "Failed creating Kafka producer message service." + throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg, + "Wrong Kafka selector provided or internal error in Kafka service.") + } catch (e: Exception) { + throw BluePrintProcessorException("failed to create producer service ${e.message}") + } + launch { /** Subscribe to the consumer topics */ val additionalConfig: MutableMap = hashMapOf() @@ -82,7 +96,8 @@ open class BluePrintProcessingKafkaConsumer( ph.register() log.trace("Consumed Message : $message") val executionServiceInput = message.jsonAsType() - executionServiceHandler.doProcess(executionServiceInput) + val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) + blueprintMessageProducerService.sendMessage(executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { @@ -93,7 +108,8 @@ open class BluePrintProcessingKafkaConsumer( } } catch (e: Exception) { log.error( - "failed to start message consumer($CONSUMER_SELECTOR) ", e + "failed to start message consumer($CONSUMER_SELECTOR) " + + "message producer($PRODUCER_SELECTOR) ", e ) } } @@ -102,7 +118,8 @@ open class BluePrintProcessingKafkaConsumer( fun shutdownMessageListener() = runBlocking { try { log.info( - "Shutting down message consumer($CONSUMER_SELECTOR)" + "Shutting down message consumer($CONSUMER_SELECTOR)" + + "message producer($PRODUCER_SELECTOR)..." ) blueprintMessageConsumerService.shutDown() ph.arriveAndAwaitAdvance() diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index 129e7a54d..1c5d47c27 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -24,6 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException +import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils @@ -48,12 +49,9 @@ class KafkaPublishAuditService( private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService, private val blueprintsProcessorCatalogService: BluePrintCatalogService ) : PublishAuditService { - private var inputInstance: BlueprintMessageProducerService? = null private var outputInstance: BlueprintMessageProducerService? = null - private lateinit var correlationUUID: String - private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString()) companion object { @@ -127,7 +125,8 @@ class KafkaPublishAuditService( correlationUUID = executionServiceInput.correlationUUID commonHeader = executionServiceInput.commonHeader actionIdentifiers = executionServiceInput.actionIdentifiers - payload = executionServiceInput.payload + payload = executionServiceInput.payload.deepCopy() + stepData = executionServiceInput.stepData } val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName @@ -173,8 +172,7 @@ class KafkaPublishAuditService( sensitiveParameters.forEach { sensitiveParameter -> if (workflowProperties.has(sensitiveParameter)) { - workflowProperties.remove(sensitiveParameter) - workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED) + workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) } } } -- cgit 1.2.3-korg