summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules
diff options
context:
space:
mode:
authorDan Timoney <dtimoney@att.com>2020-05-06 14:43:29 +0000
committerGerrit Code Review <gerrit@onap.org>2020-05-06 14:43:29 +0000
commit158603523a43d4480b519f4ef27649cd98783410 (patch)
tree08990bda39e2c542e540b9f35e527157cd260c5b /ms/blueprintsprocessor/modules
parent04c3d46682db8642ec41687309f31b1040f93e54 (diff)
parent888f2f78a580a3deccb66bef50f2375a04b39eb6 (diff)
Merge "Truncate message published on Kafka / Spike: Define solution for logs separation"
Diffstat (limited to 'ms/blueprintsprocessor/modules')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt6
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt52
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt25
-rw-r--r--ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt10
6 files changed, 79 insertions, 18 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index ac35fbf2c..b07d64388 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -78,7 +78,7 @@ open class KafkaSslAuthMessageProducerProperties : KafkaBasicAuthMessageProducer
var keystore: String? = null
var keystorePassword: String? = null
var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
- var sslEndpointIdentificationAlgorithm: String = ""
+ var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
@@ -142,7 +142,7 @@ open class KafkaStreamsSslAuthConsumerProperties : KafkaStreamsBasicAuthConsumer
var keystore: String? = null
var keystorePassword: String? = null
var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
- var sslEndpointIdentificationAlgorithm: String = ""
+ var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
@@ -218,7 +218,7 @@ open class KafkaSslAuthMessageConsumerProperties : KafkaBasicAuthMessageConsumer
var keystore: String? = null
var keystorePassword: String? = null
var keystoreType: String = SslConfigs.DEFAULT_SSL_KEYSTORE_TYPE
- var sslEndpointIdentificationAlgorithm: String = ""
+ var sslEndpointIdentificationAlgorithm: String = SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
index 931f052ed..e4991d2d8 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/KafkaMessageProducerService.kt
@@ -17,12 +17,16 @@
package org.onap.ccsdk.cds.blueprintsprocessor.message.service
+import com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.commons.lang.builder.ToStringBuilder
import org.apache.kafka.clients.producer.Callback
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.header.internals.RecordHeader
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.ExecutionServiceOutput
+import org.onap.ccsdk.cds.blueprintsprocessor.core.api.data.Status
import org.onap.ccsdk.cds.blueprintsprocessor.message.MessageProducerProperties
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.asJsonString
import org.onap.ccsdk.cds.controllerblueprints.core.defaultToUUID
import org.slf4j.LoggerFactory
@@ -39,6 +43,10 @@ class KafkaMessageProducerService(
private val messageLoggerService = MessageLoggerService()
+ companion object {
+ const val MAX_ERR_MSG_LEN = 128
+ }
+
override suspend fun sendMessageNB(message: Any): Boolean {
checkNotNull(messageProducerProperties.topic) { "default topic is not configured" }
return sendMessageNB(messageProducerProperties.topic!!, message)
@@ -54,9 +62,14 @@ class KafkaMessageProducerService(
message: Any,
headers: MutableMap<String, String>?
): Boolean {
- val byteArrayMessage = when (message) {
- is String -> message.toByteArray(Charset.defaultCharset())
- else -> message.asJsonString().toByteArray(Charset.defaultCharset())
+ var clonedMessage = message
+ if (clonedMessage is ExecutionServiceOutput) {
+ clonedMessage = truncateResponse(clonedMessage)
+ }
+
+ val byteArrayMessage = when (clonedMessage) {
+ is String -> clonedMessage.toByteArray(Charset.defaultCharset())
+ else -> clonedMessage.asJsonString().toByteArray(Charset.defaultCharset())
}
val record = ProducerRecord<String, ByteArray>(topic, defaultToUUID(), byteArrayMessage)
@@ -85,4 +98,37 @@ class KafkaMessageProducerService(
return kafkaProducer!!
}
+
+ /**
+ * Truncation of BP responses
+ */
+ private fun truncateResponse(executionServiceOutput: ExecutionServiceOutput): ExecutionServiceOutput {
+ /** Truncation of error messages */
+ var truncErrMsg = executionServiceOutput.status.errorMessage
+ if (truncErrMsg != null && truncErrMsg.length > MAX_ERR_MSG_LEN) {
+ truncErrMsg = "${truncErrMsg.substring(0,MAX_ERR_MSG_LEN)}" +
+ " [...]. Check Blueprint Processor logs for more information."
+ }
+ /** Truncation for Command Executor responses */
+ var truncPayload = executionServiceOutput.payload.deepCopy()
+ val workflowName = executionServiceOutput.actionIdentifiers.actionName
+ if (truncPayload.path("$workflowName-response").has("execute-command-logs")) {
+ var cmdExecLogNode = truncPayload.path("$workflowName-response") as ObjectNode
+ cmdExecLogNode.replace("execute-command-logs", "Check Command Executor logs for more information.".asJsonPrimitive())
+ }
+ return ExecutionServiceOutput().apply {
+ correlationUUID = executionServiceOutput.correlationUUID
+ commonHeader = executionServiceOutput.commonHeader
+ actionIdentifiers = executionServiceOutput.actionIdentifiers
+ status = Status().apply {
+ code = executionServiceOutput.status.code
+ eventType = executionServiceOutput.status.eventType
+ timestamp = executionServiceOutput.status.timestamp
+ errorMessage = truncErrMsg
+ message = executionServiceOutput.status.message
+ }
+ payload = truncPayload
+ stepData = executionServiceOutput.stepData
+ }
+ }
}
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
index ac08dc7b7..fdf6e48e7 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageConsumerServiceTest.kt
@@ -214,7 +214,7 @@ open class BlueprintMessageConsumerServiceTest {
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
"username=\"sample-user\" " +
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index 72a47ed56..da7394998 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -109,7 +109,7 @@ open class BlueprintMessageProducerServiceTest {
SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG to "/path/to/keystore.jks",
SslConfigs.SSL_KEYSTORE_TYPE_CONFIG to "JKS",
SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG to "secretpassword",
- SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to "",
+ SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG to SslConfigs.DEFAULT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM,
SaslConfigs.SASL_MECHANISM to "SCRAM-SHA-512",
SaslConfigs.SASL_JAAS_CONFIG to "${ScramLoginModule::class.java.canonicalName} required " +
"username=\"sample-user\" " +
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
index 49f2a50d5..a95af8123 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/BluePrintProcessingKafkaConsumer.kt
@@ -51,13 +51,15 @@ open class BluePrintProcessingKafkaConsumer(
companion object {
const val CONSUMER_SELECTOR = "self-service-api"
+ const val PRODUCER_SELECTOR = "self-service-api"
}
@EventListener(ApplicationReadyEvent::class)
fun setupMessageListener() = runBlocking {
try {
log.info(
- "Setting up message consumer($CONSUMER_SELECTOR)"
+ "Setting up message consumer($CONSUMER_SELECTOR)" +
+ "message producer($PRODUCER_SELECTOR)..."
)
/** Get the Message Consumer Service **/
@@ -72,6 +74,18 @@ open class BluePrintProcessingKafkaConsumer(
throw BluePrintProcessorException("failed to create consumer service ${e.message}")
}
+ /** Get the Message Producer Service **/
+ val blueprintMessageProducerService = try {
+ bluePrintMessageLibPropertyService
+ .blueprintMessageProducerService(PRODUCER_SELECTOR)
+ } catch (e: BluePrintProcessorException) {
+ val errorMsg = "Failed creating Kafka producer message service."
+ throw e.updateErrorMessage(SelfServiceApiDomains.SELF_SERVICE_API, errorMsg,
+ "Wrong Kafka selector provided or internal error in Kafka service.")
+ } catch (e: Exception) {
+ throw BluePrintProcessorException("failed to create producer service ${e.message}")
+ }
+
launch {
/** Subscribe to the consumer topics */
val additionalConfig: MutableMap<String, Any> = hashMapOf()
@@ -82,7 +96,8 @@ open class BluePrintProcessingKafkaConsumer(
ph.register()
log.trace("Consumed Message : $message")
val executionServiceInput = message.jsonAsType<ExecutionServiceInput>()
- executionServiceHandler.doProcess(executionServiceInput)
+ val executionServiceOutput = executionServiceHandler.doProcess(executionServiceInput)
+ blueprintMessageProducerService.sendMessage(executionServiceOutput)
} catch (e: Exception) {
log.error("failed in processing the consumed message : $message", e)
} finally {
@@ -93,7 +108,8 @@ open class BluePrintProcessingKafkaConsumer(
}
} catch (e: Exception) {
log.error(
- "failed to start message consumer($CONSUMER_SELECTOR) ", e
+ "failed to start message consumer($CONSUMER_SELECTOR) " +
+ "message producer($PRODUCER_SELECTOR) ", e
)
}
}
@@ -102,7 +118,8 @@ open class BluePrintProcessingKafkaConsumer(
fun shutdownMessageListener() = runBlocking {
try {
log.info(
- "Shutting down message consumer($CONSUMER_SELECTOR)"
+ "Shutting down message consumer($CONSUMER_SELECTOR)" +
+ "message producer($PRODUCER_SELECTOR)..."
)
blueprintMessageConsumerService.shutDown()
ph.arriveAndAwaitAdvance()
diff --git a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
index 129e7a54d..1c5d47c27 100644
--- a/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
+++ b/ms/blueprintsprocessor/modules/inbounds/selfservice-api/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/selfservice/api/KafkaPublishAuditService.kt
@@ -24,6 +24,7 @@ import org.onap.ccsdk.cds.blueprintsprocessor.functions.resource.resolution.Reso
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BluePrintMessageLibPropertyService
import org.onap.ccsdk.cds.blueprintsprocessor.message.service.BlueprintMessageProducerService
import org.onap.ccsdk.cds.controllerblueprints.core.BluePrintProcessorException
+import org.onap.ccsdk.cds.controllerblueprints.core.asJsonPrimitive
import org.onap.ccsdk.cds.controllerblueprints.core.common.ApplicationConstants
import org.onap.ccsdk.cds.controllerblueprints.core.interfaces.BluePrintCatalogService
import org.onap.ccsdk.cds.controllerblueprints.core.utils.BluePrintMetadataUtils
@@ -48,12 +49,9 @@ class KafkaPublishAuditService(
private val bluePrintMessageLibPropertyService: BluePrintMessageLibPropertyService,
private val blueprintsProcessorCatalogService: BluePrintCatalogService
) : PublishAuditService {
-
private var inputInstance: BlueprintMessageProducerService? = null
private var outputInstance: BlueprintMessageProducerService? = null
-
private lateinit var correlationUUID: String
-
private val log = LoggerFactory.getLogger(KafkaPublishAuditService::class.toString())
companion object {
@@ -127,7 +125,8 @@ class KafkaPublishAuditService(
correlationUUID = executionServiceInput.correlationUUID
commonHeader = executionServiceInput.commonHeader
actionIdentifiers = executionServiceInput.actionIdentifiers
- payload = executionServiceInput.payload
+ payload = executionServiceInput.payload.deepCopy()
+ stepData = executionServiceInput.stepData
}
val blueprintName = clonedExecutionServiceInput.actionIdentifiers.blueprintName
@@ -173,8 +172,7 @@ class KafkaPublishAuditService(
sensitiveParameters.forEach { sensitiveParameter ->
if (workflowProperties.has(sensitiveParameter)) {
- workflowProperties.remove(sensitiveParameter)
- workflowProperties.put(sensitiveParameter, ApplicationConstants.LOG_REDACTED)
+ workflowProperties.replace(sensitiveParameter, ApplicationConstants.LOG_REDACTED.asJsonPrimitive())
}
}
}