summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt3
-rw-r--r--ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt7
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt29
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt21
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt13
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt5
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt9
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt1
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt10
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt176
12 files changed, 168 insertions, 110 deletions
diff --git a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
index 7e6bf68be..af8d902cd 100644
--- a/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
+++ b/ms/blueprintsprocessor/functions/message-prioritizaion/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/functions/message/prioritization/MessagePrioritizationConsumerTest.kt
@@ -260,6 +260,7 @@ open class MessagePrioritizationConsumerTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.id
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = it.asJsonString(false),
headers = headers
)
@@ -272,6 +273,7 @@ open class MessagePrioritizationConsumerTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.id
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = it.asJsonString(false),
headers = headers
)
@@ -284,6 +286,7 @@ open class MessagePrioritizationConsumerTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.id
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = it.asJsonString(false),
headers = headers
)
diff --git a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
index cb78f4132..76662d4ee 100644
--- a/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
+++ b/ms/blueprintsprocessor/modules/blueprints/blueprint-core/src/main/kotlin/org/onap/ccsdk/cds/controllerblueprints/core/BluePrintConstants.kt
@@ -242,4 +242,6 @@ object BluePrintConstants {
const val PROPERTY_CLUSTER_CONFIG_FILE = "CLUSTER_CONFIG_FILE"
const val NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION = "component-resource-resolution"
+ const val NODE_TEMPLATE_TYPE_DG = "dg-generic"
+ const val PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE = "dependency-node-templates"
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
index f74abcdb7..311d35c38 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerService.kt
@@ -19,6 +19,7 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.streams.Topology
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
@@ -29,15 +30,15 @@ interface ConsumerFunction
interface BlueprintMessageConsumerService {
- suspend fun subscribe(): Channel<String> {
+ suspend fun subscribe(): Channel<ConsumerRecord<String, ByteArray>> {
return subscribe(null)
}
/** Subscribe to the Kafka channel with [additionalConfig] */
- suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String>
+ suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>>
/** Subscribe to the Kafka channel with [additionalConfig] for dynamic [topics]*/
- suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<String>
+ suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>? = null): Channel<ConsumerRecord<String, ByteArray>>
/** Consume and execute dynamic function [consumerFunction] */
suspend fun consume(consumerFunction: ConsumerFunction) {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
index cdc65a1c6..66d3a5b73 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerService.kt
@@ -17,34 +17,19 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.runBlocking
+import java.util.UUID
interface BlueprintMessageProducerService {
- fun sendMessage(message: Any): Boolean {
- return sendMessage(message = message, headers = null)
+ fun sendMessage(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+ sendMessageNB(key, message, headers)
}
- fun sendMessage(topic: String, message: Any): Boolean {
- return sendMessage(topic, message, null)
+ fun sendMessage(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean = runBlocking {
+ sendMessageNB(key, topic, message, headers)
}
- fun sendMessage(message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
- sendMessageNB(message = message, headers = headers)
- }
-
- fun sendMessage(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean = runBlocking {
- sendMessageNB(topic, message, headers)
- }
-
- suspend fun sendMessageNB(message: Any): Boolean {
- return sendMessageNB(message = message, headers = null)
- }
-
- suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean
-
- suspend fun sendMessageNB(topic: String, message: Any): Boolean {
- return sendMessageNB(topic, message, null)
- }
+ suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), message: Any, headers: MutableMap<String, String>? = null): Boolean
- suspend fun sendMessageNB(topic: String, message: Any, headers: MutableMap<String, String>?): Boolean
+ suspend fun sendMessageNB(key: String = UUID.randomUUID().toString(), topic: String, message: Any, headers: MutableMap<String, String>? = null): Boolean
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
index cdcd4197c..a0932e916 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageConsumerService.kt
@@ -19,13 +19,12 @@ package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
-import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.consumer.Consumer
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.onap.ccsdk.cds.blueprintsprocessor.message.KafkaBasicAuthMessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.logger
-import java.nio.charset.Charset
import java.time.Duration
import kotlin.concurrent.thread
@@ -35,7 +34,7 @@ open class KafkaMessageConsumerService(
BlueprintMessageConsumerService {
val log = logger(KafkaMessageConsumerService::class)
- val channel = Channel<String>()
+ val channel = Channel<ConsumerRecord<String, ByteArray>>()
var kafkaConsumer: Consumer<String, ByteArray>? = null
@Volatile
@@ -49,14 +48,14 @@ open class KafkaMessageConsumerService(
return KafkaConsumer(configProperties)
}
- override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
/** get to topic names */
val consumerTopic = messageConsumerProperties.topic?.split(",")?.map { it.trim() }
check(!consumerTopic.isNullOrEmpty()) { "couldn't get topic information" }
return subscribe(consumerTopic, additionalConfig)
}
- override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
/** Create Kafka consumer */
kafkaConsumer = kafkaConsumer(additionalConfig)
@@ -78,14 +77,10 @@ open class KafkaMessageConsumerService(
runBlocking {
consumerRecords?.forEach { consumerRecord ->
/** execute the command block */
- consumerRecord.value()?.let {
- launch {
- if (!channel.isClosedForSend) {
- channel.send(String(it, Charset.defaultCharset()))
- } else {
- log.error("Channel is closed to receive message")
- }
- }
+ if (!channel.isClosedForSend) {
+ channel.send(consumerRecord)
+ } else {
+ log.error("Channel is closed to receive message")
}
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 8958d4f0c..59e9192bb 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -29,7 +29,6 @@ import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
-import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
import java.nio.charset.Charset
@@ -48,17 +47,13 @@ class KafkaMessageProducerService(
const val MAX_ERR_MSG_LEN = 128
}
- override suspend fun sendMessageNB(message: Any): Boolean {
+ override suspend fun sendMessageNB(key: String, message: Any, headers: MutableMap<String, String>?): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessageNB(messageProducerProperties.topic!!, message)
- }
-
- override suspend fun sendMessageNB(message: Any, headers: MutableMap<String, String>?): Boolean {
- checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
- return sendMessageNB(messageProducerProperties.topic!!, message, headers)
+ return sendMessageNB(key, messageProducerProperties.topic!!, message, headers)
}
override suspend fun sendMessageNB(
+ key: String,
topic: String,
message: Any,
headers: MutableMap<String, String>?
@@ -73,7 +68,7 @@ class KafkaMessageProducerService(
else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
}
- val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
+ val record = ProducerRecord<String, ByteArray>(topic, key, byteArrayMessage)
val recordHeaders = record.headers()
messageLoggerService.messageProducing(recordHeaders)
headers?.let {
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
index 60f2dfa05..4340e4815 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
import kotlinx.coroutines.channels.Channel
+import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.streams.KafkaStreams
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageConsumerProperties
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
@@ -39,11 +40,11 @@ open class KafkaStreamsConsumerService(private val messageConsumerProperties: Me
return configProperties
}
- override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
throw BluePrintProcessorException("not implemented")
}
- override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<String> {
+ override suspend fun subscribe(topics: List<String>, additionalConfig: Map<String, Any>?): Channel<ConsumerRecord<String, ByteArray>> {
throw BluePrintProcessorException("not implemented")
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index fdf6e48e7..77bdbe408 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -51,6 +51,7 @@ import org.springframework.test.annotation.DirtiesContext
import org.springframework.test.context.ContextConfiguration
import org.springframework.test.context.TestPropertySource
import org.springframework.test.context.junit4.SpringRunner
+import java.nio.charset.Charset
import kotlin.test.assertEquals
import kotlin.test.assertNotNull
import kotlin.test.assertTrue
@@ -133,9 +134,14 @@ open class BlueprintMessageConsumerServiceTest {
every { spyBlueprintMessageConsumerService.kafkaConsumer(any()) } returns mockKafkaConsumer
val channel = spyBlueprintMessageConsumerService.subscribe(null)
+ var i = 0
launch {
channel.consumeEach {
- assertTrue(it.startsWith("I am message"), "failed to get the actual message")
+ ++i
+ val key = it.key()
+ val value = String(it.value(), Charset.defaultCharset())
+ assertTrue(value.startsWith("I am message"), "failed to get the actual message")
+ assertEquals("key_$i", key)
}
}
delay(10)
@@ -268,6 +274,7 @@ open class BlueprintMessageConsumerServiceTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = "this is my message($it)",
headers = headers
)
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 537dab1ba..881f0b422 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -88,7 +88,7 @@ open class BlueprintMessageProducerServiceTest {
every { spyBluePrintMessageProducerService.messageTemplate(any()) } returns mockKafkaTemplate
- val response = spyBluePrintMessageProducerService.sendMessage("Testing message")
+ val response = spyBluePrintMessageProducerService.sendMessage("mykey", "Testing message")
assertTrue(response, "failed to get command response")
}
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
index c30ab9b02..44990ae7f 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaStreamsConsumerServiceTest.kt
@@ -132,6 +132,7 @@ class KafkaStreamsConsumerServiceTest {
val headers: MutableMap<String, String> = hashMapOf()
headers["id"] = it.toString()
blueprintMessageProducerService.sendMessageNB(
+ key = "mykey",
message = "this is my message($it)",
headers = headers
)
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index 1f3dd6547..1ccf230d7 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -31,6 +31,8 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.boot.context.event.ApplicationReadyEvent
import org.springframework.context.event.EventListener
import org.springframework.stereotype.Service
+import java.nio.charset.Charset
+import java.util.UUID
import java.util.concurrent.Phaser
import javax.annotation.PreDestroy
@@ -95,10 +97,12 @@ open class BluePrintProcessingKafkaConsumer(
launch {
try {
ph.register()
- log.trace("Consumed Message : $message")
- val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
+ val key = message.key() ?: UUID.randomUUID().toString()
+ val value = String(message.value(), Charset.defaultCharset())
+ log.trace("Consumed Message : key($key) value($value)")
+ val executionServiceInput = value.jsonAsType<ExecutionServiceInput>()
val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
- blueprintMessageProducerService.sendMessage(executionServiceOutput)
+ blueprintMessageProducerService.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
} finally {
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index fca73981e..145c37b01 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -17,6 +17,7 @@
package org.onap.ccsdk.cds.blueprintsprocessor.selfservice.api
import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.ArrayNode
import com.fasterxml.jackson.databind.node.ObjectNode
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceInput
import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
@@ -27,6 +28,8 @@ import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintConstants
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintContext
+import org.onap.ccsdk.cds.controllerblueprints.core.service.BluePrintRuntimeService
import org.onap.ccsdk.cds.controllerblueprints.core.service.PropertyAssignmentService
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
import org.onap.ccsdk.cds.controllerblueprints.core.utils.JacksonUtils
@@ -40,6 +43,13 @@ import javax.annotation.PostConstruct
/**
* Audit service used to produce execution service input and output message
* sent into dedicated kafka topics.
+ *
+ * @param bluePrintMessageLibPropertyService Service used to instantiate audit service producers
+ * @param blueprintsProcessorCatalogService Service used to get the base path of the current CBA executed
+ *
+ * @property inputInstance Request Kakfa Producer instance
+ * @property outputInstance Response Kakfa Producer instance
+ * @property log Audit Service logger
*/
@ConditionalOnProperty(
name = ["blueprintsprocessor.messageproducer.self-service-api.audit.kafkaEnable"],
@@ -68,12 +78,14 @@ class KafkaPublishAuditService(
* Publish execution input into a kafka topic.
* The correlation UUID is used to link the input to its output.
* Sensitive data within the request are hidden.
+ * @param executionServiceInput Audited BP request
*/
override suspend fun publishExecutionInput(executionServiceInput: ExecutionServiceInput) {
val secureExecutionServiceInput = hideSensitiveData(executionServiceInput)
+ val key = secureExecutionServiceInput.actionIdentifiers.blueprintName
try {
this.inputInstance = this.getInputInstance(INPUT_SELECTOR)
- this.inputInstance!!.sendMessage(secureExecutionServiceInput)
+ this.inputInstance!!.sendMessage(key, secureExecutionServiceInput)
} catch (e: Exception) {
var errMsg =
if (e.message != null) "ERROR : ${e.message}"
@@ -86,32 +98,38 @@ class KafkaPublishAuditService(
* Publish execution output into a kafka topic.
* The correlation UUID is used to link the output to its input.
* A correlation UUID is added to link the input to its output.
+ * @param correlationUUID UUID used to link the audited response to its audited request
+ * @param executionServiceOutput Audited BP response
*/
override suspend fun publishExecutionOutput(correlationUUID: String, executionServiceOutput: ExecutionServiceOutput) {
executionServiceOutput.correlationUUID = correlationUUID
+ val key = executionServiceOutput.actionIdentifiers.blueprintName
try {
this.outputInstance = this.getOutputInstance(OUTPUT_SELECTOR)
- this.outputInstance!!.sendMessage(executionServiceOutput)
+ this.outputInstance!!.sendMessage(key, executionServiceOutput)
} catch (e: Exception) {
var errMsg =
- if (e.message != null) "ERROR : $e"
- else "ERROR : Failed to send execution request to Kafka."
+ if (e.message != null) "ERROR : $e"
+ else "ERROR : Failed to send execution request to Kafka."
log.error(errMsg)
}
}
/**
- * Return the input kafka producer instance using a selector.
+ * Return the input kafka producer instance using a [selector] if not already instantiated.
+ * @param selector Selector to retrieve request kafka producer configuration
*/
private fun getInputInstance(selector: String): BlueprintMessageProducerService = inputInstance ?: createInstance(selector)
/**
- * Return the output kafka producer instance using a selector.
+ * Return the output kafka producer instance using a [selector] if not already instantiated.
+ * @param selector Selector to retrieve response kafka producer configuration
*/
private fun getOutputInstance(selector: String): BlueprintMessageProducerService = outputInstance ?: createInstance(selector)
/**
- * Create a kafka producer instance.
+ * Create a kafka producer instance using a [selector].
+ * @param selector Selector to retrieve kafka producer configuration
*/
private fun createInstance(selector: String): BlueprintMessageProducerService {
log.info("Setting up message producer($selector)...")
@@ -119,9 +137,10 @@ class KafkaPublishAuditService(
}
/**
- * Hide sensitive data in the request.
+ * Hide sensitive data in the [executionServiceInput].
* Sensitive data are declared in the resource resolution mapping using
* the property metadata "log-protect" set to true.
+ * @param executionServiceInput BP Execution Request where data needs to be hidden
*/
private suspend fun hideSensitiveData(
executionServiceInput: ExecutionServiceInput
@@ -153,60 +172,105 @@ class KafkaPublishAuditService(
val blueprintRuntimeService = BluePrintMetadataUtils.getBluePrintRuntime(requestId, basePath.toString())
val blueprintContext = blueprintRuntimeService.bluePrintContext()
- /** Looking for node templates defined as component-resource-resolution */
- val nodeTemplates = blueprintContext.nodeTemplates()
- nodeTemplates!!.forEach { nodeTemplate ->
- val nodeTemplateName = nodeTemplate.key
- val nodeTemplateType = blueprintContext.nodeTemplateByName(nodeTemplateName).type
- if (nodeTemplateType == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
- val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
- val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
-
- val propertyAssignments: MutableMap<String, JsonNode> =
- blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
- ?: hashMapOf()
-
- /** Getting values define in artifact-prefix-names */
- val input = executionServiceInput.payload.get("$workflowName-request")
- blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
- val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
- val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
- val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
- BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
- nodeTemplateName,
- ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
- artifactPrefixNamesNode!!)
-
- val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
-
- /** Storing mapping entries with metadata log-protect set to true */
- val sensitiveParameters: List<String> = artifactPrefixNames
- .map { "$it-mapping" }
- .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
- .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
- .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
- .map { it.name }
-
- /** Hiding sensitive input parameters from the request */
- var workflowProperties: ObjectNode = clonedExecutionServiceInput.payload
- .path("$workflowName-request")
- .path("$workflowName-properties") as ObjectNode
-
- sensitiveParameters.forEach { sensitiveParameter ->
- if (workflowProperties.has(sensitiveParameter)) {
- workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
- }
+ val workflowSteps = blueprintContext.workflowByName(workflowName).steps
+ checkNotNull(workflowSteps) { "Failed to get step(s) for workflow($workflowName)" }
+ workflowSteps.forEach { step ->
+ val nodeTemplateName = step.value.target
+ checkNotNull(nodeTemplateName) { "Failed to get node template target for workflow($workflowName), step($step)" }
+ val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName)
+
+ /** We need to check in his Node Template Dependencies is case of a Node Template DG */
+ if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_DG) {
+ val dependencyNodeTemplate = nodeTemplate.properties?.get(BluePrintConstants.PROPERTY_DG_DEPENDENCY_NODE_TEMPLATE) as ArrayNode
+ dependencyNodeTemplate.forEach { dependencyNodeTemplateName ->
+ clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution(
+ blueprintRuntimeService,
+ blueprintContext,
+ clonedExecutionServiceInput,
+ workflowName,
+ dependencyNodeTemplateName.asText()
+ )
}
+ } else {
+ clonedExecutionServiceInput = hideSensitiveDataFromResourceResolution(
+ blueprintRuntimeService,
+ blueprintContext,
+ clonedExecutionServiceInput,
+ workflowName,
+ nodeTemplateName
+ )
}
}
}
} catch (e: Exception) {
- val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
- log.error(errMsg, e)
- clonedExecutionServiceInput.payload.replace(
- "$workflowName-request",
- "$errMsg $e".asJsonPrimitive())
+ val errMsg = "ERROR : Couldn't hide sensitive data in the execution request."
+ log.error(errMsg, e)
+ clonedExecutionServiceInput.payload.replace(
+ "$workflowName-request",
+ "$errMsg $e".asJsonPrimitive())
}
return clonedExecutionServiceInput
}
+
+ /**
+ * Hide sensitive data in [executionServiceInput] if the given [nodeTemplateName] is a
+ * resource resolution component.
+ * @param blueprintRuntimeService Current blueprint runtime service
+ * @param blueprintContext Current blueprint runtime context
+ * @param executionServiceInput BP Execution Request where data needs to be hidden
+ * @param workflowName Current workflow being executed
+ * @param nodeTemplateName Node template to check for sensitive data
+ * @return [executionServiceInput] with sensitive inputs replaced by a generic string
+ */
+ private suspend fun hideSensitiveDataFromResourceResolution(
+ blueprintRuntimeService: BluePrintRuntimeService<MutableMap<String, JsonNode>>,
+ blueprintContext: BluePrintContext,
+ executionServiceInput: ExecutionServiceInput,
+ workflowName: String,
+ nodeTemplateName: String
+ ): ExecutionServiceInput {
+
+ val nodeTemplate = blueprintContext.nodeTemplateByName(nodeTemplateName)
+ if (nodeTemplate.type == BluePrintConstants.NODE_TEMPLATE_TYPE_COMPONENT_RESOURCE_RESOLUTION) {
+ val interfaceName = blueprintContext.nodeTemplateFirstInterfaceName(nodeTemplateName)
+ val operationName = blueprintContext.nodeTemplateFirstInterfaceFirstOperationName(nodeTemplateName)
+
+ val propertyAssignments: MutableMap<String, JsonNode> =
+ blueprintContext.nodeTemplateInterfaceOperationInputs(nodeTemplateName, interfaceName, operationName)
+ ?: hashMapOf()
+
+ /** Getting values define in artifact-prefix-names */
+ val input = executionServiceInput.payload.get("$workflowName-request")
+ blueprintRuntimeService.assignWorkflowInputs(workflowName, input)
+ val artifactPrefixNamesNode = propertyAssignments[ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES]
+ val propertyAssignmentService = PropertyAssignmentService(blueprintRuntimeService)
+ val artifactPrefixNamesNodeValue = propertyAssignmentService.resolveAssignmentExpression(
+ BluePrintConstants.MODEL_DEFINITION_TYPE_NODE_TEMPLATE,
+ nodeTemplateName,
+ ResourceResolutionConstants.INPUT_ARTIFACT_PREFIX_NAMES,
+ artifactPrefixNamesNode!!)
+
+ val artifactPrefixNames = JacksonUtils.getListFromJsonNode(artifactPrefixNamesNodeValue!!, String::class.java)
+
+ /** Storing mapping entries with metadata log-protect set to true */
+ val sensitiveParameters: List<String> = artifactPrefixNames
+ .map { "$it-mapping" }
+ .map { blueprintRuntimeService.resolveNodeTemplateArtifact(nodeTemplateName, it) }
+ .flatMap { JacksonUtils.getListFromJson(it, ResourceAssignment::class.java) }
+ .filter { PropertyDefinitionUtils.hasLogProtect(it.property) }
+ .map { it.name }
+
+ /** Hiding sensitive input parameters from the request */
+ var workflowProperties: ObjectNode = executionServiceInput.payload
+ .path("$workflowName-request")
+ .path("$workflowName-properties") as ObjectNode
+
+ sensitiveParameters.forEach { sensitiveParameter ->
+ if (workflowProperties.has(sensitiveParameter)) {
+ workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
+ }
+ }
+ }
+ return executionServiceInput
+ }
}