diff options
12 files changed, 168 insertions, 110 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt index 7e6bf68be..af8d902cd 100644 --- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt +++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt @@ -260,6 +260,7 @@ open class MessagePrioritizationConsumerTest { val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.id blueprintMessageProducerService.sendMessageNB( + key = "mykey", message = it.asJsonString(false), headers = headers ) @@ -272,6 +273,7 @@ open class MessagePrioritizationConsumerTest { val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.id blueprintMessageProducerService.sendMessageNB( + key = "mykey", message = it.asJsonString(false), headers = headers ) @@ -284,6 +286,7 @@ open class MessagePrioritizationConsumerTest { val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.id blueprintMessageProducerService.sendMessageNB( + key = "mykey", message = it.asJsonString(false), headers = headers ) diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt index cb78f4132..76662d4ee 100644 --- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt +++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt @@ -242,4 +242,6 @@ object BluePrintConstants { const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE" const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution" + const val NODE_TEMPLATE_TYPE_DG = "dg-generic" + const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates" } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt index f74abcdb7..311d35c38 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt @@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.ConsumerRecords import org.apache.kafka.streams.Topology import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties @@ -29,15 +30,15 @@ interface ConsumerFunction interface BlueprintMessageConsumerService { - suspend fun subscribe(): Channel<String> { + suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> { return subscribe(null) } /** Subscribe to the Kafka channel with [additionalConfig] */ - suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> + suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> /** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/ - suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String> + suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>> /** Consume and execute dynamic function [consumerFunction] */ suspend fun consume(consumerFunction: ConsumerFunction) { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt index cdc65a1c6..66d3a5b73 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt @@ -17,34 +17,19 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.runBlocking +import java.util.UUID interface BlueprintMessageProducerService { - fun sendMessage(message: Any): Boolean { - return sendMessage(message = message, headers = null) + fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking { + sendMessageNB(key, message, headers) } - fun sendMessage(topic: String, message: Any): Boolean { - return sendMessage(topic, message, null) + fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking { + sendMessageNB(key, topic, message, headers) } - fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { - sendMessageNB(message = message, headers = headers) - } - - fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking { - sendMessageNB(topic, message, headers) - } - - suspend fun sendMessageNB(message: Any): Boolean { - return sendMessageNB(message = message, headers = null) - } - - suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean - - suspend fun sendMessageNB(topic: String, message: Any): Boolean { - return sendMessageNB(topic, message, null) - } + suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean - suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean + suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt index cdcd4197c..a0932e916 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt @@ -19,13 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.apache.kafka.clients.consumer.Consumer +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.logger -import java.nio.charset.Charset import java.time.Duration import kotlin.concurrent.thread @@ -35,7 +34,7 @@ open class KafkaMessageConsumerService( BlueprintMessageConsumerService { val log = logger(KafkaMessageConsumerService::class) - val channel = Channel<String>() + val channel = Channel<ConsumerRecord<String, ByteArray>>() var kafkaConsumer: Consumer<String, ByteArray>? = null @Volatile @@ -49,14 +48,14 @@ open class KafkaMessageConsumerService( return KafkaConsumer(configProperties) } - override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { /** get to topic names */ val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() } check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" } return subscribe(consumerTopic, additionalConfig) } - override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { /** Create Kafka consumer */ kafkaConsumer = kafkaConsumer(additionalConfig) @@ -78,14 +77,10 @@ open class KafkaMessageConsumerService( runBlocking { consumerRecords?.forEach { consumerRecord -> /** execute the command block */ - consumerRecord.value()?.let { - launch { - if (!channel.isClosedForSend) { - channel.send(String(it, Charset.defaultCharset())) - } else { - log.error("Channel is closed to receive message") - } - } + if (!channel.isClosedForSend) { + channel.send(consumerRecord) + } else { + log.error("Channel is closed to receive message") } } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt index 8958d4f0c..59e9192bb 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt @@ -29,7 +29,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString -import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID import org.slf4j.LoggerFactory import java.nio.charset.Charset @@ -48,17 +47,13 @@ class KafkaMessageProducerService( const val MAX_ERR_MSG_LEN = 128 } - override suspend fun sendMessageNB(message: Any): Boolean { + override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean { checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message) - } - - override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean { - checkNotNull(messageProducerProperties.topic) { "default topic is not configured" } - return sendMessageNB(messageProducerProperties.topic!!, message, headers) + return sendMessageNB(key, messageProducerProperties.topic!!, message, headers) } override suspend fun sendMessageNB( + key: String, topic: String, message: Any, headers: MutableMap<String, String>? @@ -73,7 +68,7 @@ class KafkaMessageProducerService( else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset()) } - val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage) + val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage) val recordHeaders = record.headers() messageLoggerService.messageProducing(recordHeaders) headers?.let { diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt index 60f2dfa05..4340e4815 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service import kotlinx.coroutines.channels.Channel +import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.streams.KafkaStreams import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException @@ -39,11 +40,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me return configProperties } - override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { throw BluePrintProcessorException("not implemented") } - override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> { + override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> { throw BluePrintProcessorException("not implemented") } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt index fdf6e48e7..77bdbe408 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt @@ -51,6 +51,7 @@ import org.springframework.test.annotation.DirtiesContext import org.springframework.test.context.ContextConfiguration import org.springframework.test.context.TestPropertySource import org.springframework.test.context.junit4.SpringRunner +import java.nio.charset.Charset import kotlin.test.assertEquals import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -133,9 +134,14 @@ open class BlueprintMessageConsumerServiceTest { every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer val channel = spyBlueprintMessageConsumerService.subscribe(null) + var i = 0 launch { channel.consumeEach { - assertTrue(it.startsWith("I am message"), "failed to get the actual message") + ++i + val key = it.key() + val value = String(it.value(), Charset.defaultCharset()) + assertTrue(value.startsWith("I am message"), "failed to get the actual message") + assertEquals("key_$i", key) } } delay(10) @@ -268,6 +274,7 @@ open class BlueprintMessageConsumerServiceTest { val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() blueprintMessageProducerService.sendMessageNB( + key = "mykey", message = "this is my message($it)", headers = headers ) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index 537dab1ba..881f0b422 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -88,7 +88,7 @@ open class BlueprintMessageProducerServiceTest { every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate - val response = spyBluePrintMessageProducerService.sendMessage("Testing message") + val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message") assertTrue(response, "failed to get command response") } } diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt index c30ab9b02..44990ae7f 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt @@ -132,6 +132,7 @@ class KafkaStreamsConsumerServiceTest { val headers: MutableMap<String, String> = hashMapOf() headers["id"] = it.toString() blueprintMessageProducerService.sendMessageNB( + key = "mykey", message = "this is my message($it)", headers = headers ) diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt index 1f3dd6547..1ccf230d7 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt @@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty import org.springframework.boot.context.event.ApplicationReadyEvent import org.springframework.context.event.EventListener import org.springframework.stereotype.Service +import java.nio.charset.Charset +import java.util.UUID import java.util.concurrent.Phaser import javax.annotation.PreDestroy @@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer( launch { try { ph.register() - log.trace("Consumed Message : $message") - val executionServiceInput = message.jsonAsType<ExecutionServiceInput>() + val key = message.key() ?: UUID.randomUUID().toString() + val value = String(message.value(), Charset.defaultCharset()) + log.trace("Consumed Message : key($key) value($value)") + val executionServiceInput = value.jsonAsType<ExecutionServiceInput>() val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput) - blueprintMessageProducerService.sendMessage(executionServiceOutput) + blueprintMessageProducerService.sendMessage(key, executionServiceOutput) } catch (e: Exception) { log.error("failed in processing the consumed message : $message", e) } finally { diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt index fca73981e..145c37b01 100644 --- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt +++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt @@ -17,6 +17,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ArrayNode import com.fasterxml.jackson.databind.node.ObjectNode import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput @@ -27,6 +28,8 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext +import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService import org.onap.ccsdk.cds.controllerblueprints.core.service.PropertyAssignmentService import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils @@ -40,6 +43,13 @@ import javax.annotation.PostConstruct /** * Audit service used to produce execution service input and output message * sent into dedicated kafka topics. + * + * @param bluePrintMessageLibPropertyService Service used to instantiate audit service producers + * @param blueprintsProcessorCatalogService Service used to get the base path of the current CBA executed + * + * @property inputInstance Request Kakfa Producer instance + * @property outputInstance Response Kakfa Producer instance + * @property log Audit Service logger */ @ConditionalOnProperty( name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"], @@ -68,12 +78,14 @@ class KafkaPublishAuditService( * Publish execution input into a kafka topic. * The correlation UUID is used to link the input to its output. * Sensitive data within the request are hidden. + * @param executionServiceInput Audited BP request */ override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) { val secureExecutionServiceInput = hideSensitiveData(executionServiceInput) + val key = secureExecutionServiceInput.actionIdentifiers.blueprintName try { this.inputInstance = this.getInputInstance(INPUT_SELECTOR) - this.inputInstance!!.sendMessage(secureExecutionServiceInput) + this.inputInstance!!.sendMessage(key, secureExecutionServiceInput) } catch (e: Exception) { var errMsg = if (e.message != null) "ERROR : ${e.message}" @@ -86,32 +98,38 @@ class KafkaPublishAuditService( * Publish execution output into a kafka topic. * The correlation UUID is used to link the output to its input. * A correlation UUID is added to link the input to its output. + * @param correlationUUID UUID used to link the audited response to its audited request + * @param executionServiceOutput Audited BP response */ override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) { executionServiceOutput.correlationUUID = correlationUUID + val key = executionServiceOutput.actionIdentifiers.blueprintName try { this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR) - this.outputInstance!!.sendMessage(executionServiceOutput) + this.outputInstance!!.sendMessage(key, executionServiceOutput) } catch (e: Exception) { var errMsg = - if (e.message != null) "ERROR : $e" - else "ERROR : Failed to send execution request to Kafka." + if (e.message != null) "ERROR : $e" + else "ERROR : Failed to send execution request to Kafka." log.error(errMsg) } } /** - * Return the input kafka producer instance using a selector. + * Return the input kafka producer instance using a [selector] if not already instantiated. + * @param selector Selector to retrieve request kafka producer configuration */ private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector) /** - * Return the output kafka producer instance using a selector. + * Return the output kafka producer instance using a [selector] if not already instantiated. + * @param selector Selector to retrieve response kafka producer configuration */ private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector) /** - * Create a kafka producer instance. + * Create a kafka producer instance using a [selector]. + * @param selector Selector to retrieve kafka producer configuration */ private fun createInstance(selector: String): BlueprintMessageProducerService { log.info("Setting up message producer($selector)...") @@ -119,9 +137,10 @@ class KafkaPublishAuditService( } /** - * Hide sensitive data in the request. + * Hide sensitive data in the [executionServiceInput]. * Sensitive data are declared in the resource resolution mapping using * the property metadata "log-protect" set to true. + * @param executionServiceInput BP Execution Request where data needs to be hidden */ private suspend fun hideSensitiveData( executionServiceInput: ExecutionServiceInput @@ -153,60 +172,105 @@ class KafkaPublishAuditService( val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString()) val blueprintContext = blueprintRuntimeService.bluePrintContext() - /** Looking for node templates defined as component-resource-resolution */ - val nodeTemplates = blueprintContext.nodeTemplates() - nodeTemplates!!.forEach { nodeTemplate -> - val nodeTemplateName = nodeTemplate.key - val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type - if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { - val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) - val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) - - val propertyAssignments: MutableMap<String, JsonNode> = - blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) - ?: hashMapOf() - - /** Getting values define in artifact-prefix-names */ - val input = executionServiceInput.payload.get("$workflowName-request") - blueprintRuntimeService.assignWorkflowInputs(workflowName, input) - val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] - val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) - val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( - BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, - nodeTemplateName, - ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, - artifactPrefixNamesNode!!) - - val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) - - /** Storing mapping entries with metadata log-protect set to true */ - val sensitiveParameters: List<String> = artifactPrefixNames - .map { "$it-mapping" } - .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } - .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } - .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } - .map { it.name } - - /** Hiding sensitive input parameters from the request */ - var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload - .path("$workflowName-request") - .path("$workflowName-properties") as ObjectNode - - sensitiveParameters.forEach { sensitiveParameter -> - if (workflowProperties.has(sensitiveParameter)) { - workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) - } + val workflowSteps = blueprintContext.workflowByName(workflowName).steps + checkNotNull(workflowSteps) { "Failed to get step(s) for workflow($workflowName)" } + workflowSteps.forEach { step -> + val nodeTemplateName = step.value.target + checkNotNull(nodeTemplateName) { "Failed to get node template target for workflow($workflowName), step($step)" } + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) + + /** We need to check in his Node Template Dependencies is case of a Node Template DG */ + if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_DG) { + val dependencyNodeTemplate = nodeTemplate.properties?.get(BluePrintConstants.PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE) as ArrayNode + dependencyNodeTemplate.forEach { dependencyNodeTemplateName -> + clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution( + blueprintRuntimeService, + blueprintContext, + clonedExecutionServiceInput, + workflowName, + dependencyNodeTemplateName.asText() + ) } + } else { + clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution( + blueprintRuntimeService, + blueprintContext, + clonedExecutionServiceInput, + workflowName, + nodeTemplateName + ) } } } } catch (e: Exception) { - val errMsg = "ERROR : Couldn't hide sensitive data in the execution request." - log.error(errMsg, e) - clonedExecutionServiceInput.payload.replace( - "$workflowName-request", - "$errMsg $e".asJsonPrimitive()) + val errMsg = "ERROR : Couldn't hide sensitive data in the execution request." + log.error(errMsg, e) + clonedExecutionServiceInput.payload.replace( + "$workflowName-request", + "$errMsg $e".asJsonPrimitive()) } return clonedExecutionServiceInput } + + /** + * Hide sensitive data in [executionServiceInput] if the given [nodeTemplateName] is a + * resource resolution component. + * @param blueprintRuntimeService Current blueprint runtime service + * @param blueprintContext Current blueprint runtime context + * @param executionServiceInput BP Execution Request where data needs to be hidden + * @param workflowName Current workflow being executed + * @param nodeTemplateName Node template to check for sensitive data + * @return [executionServiceInput] with sensitive inputs replaced by a generic string + */ + private suspend fun hideSensitiveDataFromResourceResolution( + blueprintRuntimeService: BluePrintRuntimeService<MutableMap<String, JsonNode>>, + blueprintContext: BluePrintContext, + executionServiceInput: ExecutionServiceInput, + workflowName: String, + nodeTemplateName: String + ): ExecutionServiceInput { + + val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName) + if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) { + val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName) + val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName) + + val propertyAssignments: MutableMap<String, JsonNode> = + blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName) + ?: hashMapOf() + + /** Getting values define in artifact-prefix-names */ + val input = executionServiceInput.payload.get("$workflowName-request") + blueprintRuntimeService.assignWorkflowInputs(workflowName, input) + val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES] + val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService) + val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression( + BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE, + nodeTemplateName, + ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES, + artifactPrefixNamesNode!!) + + val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java) + + /** Storing mapping entries with metadata log-protect set to true */ + val sensitiveParameters: List<String> = artifactPrefixNames + .map { "$it-mapping" } + .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) } + .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) } + .filter { PropertyDefinitionUtils.hasLogProtect(it.property) } + .map { it.name } + + /** Hiding sensitive input parameters from the request */ + var workflowProperties: ObjectNode = executionServiceInput.payload + .path("$workflowName-request") + .path("$workflowName-properties") as ObjectNode + + sensitiveParameters.forEach { sensitiveParameter -> + if (workflowProperties.has(sensitiveParameter)) { + workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive()) + } + } + } + return executionServiceInput + } } |