summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib
diff options
context:
space:
mode:
authorJulien Fontaine <julien.fontaine@bell.ca>2020-06-18 18:16:38 -0400
committerJulien Fontaine <julien.fontaine@bell.ca>2020-06-22 14:46:22 -0400
commitc2046c09fd65a9ba4d000beacb3836869db771f2 (patch)
tree847f9bb3824b0a906589567e819673b4c7360207 /ms/blueprintsprocessor/modules/commons/message-lib
parent214c8dfe0e56aff0fe0c2a3a5d7f76f2a4972710 (diff)
Kafka Audit Service : Improve error handling and miscellaneous refactoring
When Kafka Audit Service fails it no longer stops Blueprint Processor execution * Add error handling when trying to hide sensitive data * Add error handling when trying to send kafka message * Set timeout for blocking loop when sending messages with kafka producer -> When broker is not available producer tries to reconnect in a blocking loop * Refactor Audit Service interface to give more explict name for publish methods * Modify publishExecutionOutput() to a non-blocking function Issue-ID: CCSDK-2459 Signed-off-by: Julien Fontaine <julien.fontaine@bell.ca> Change-Id: I809a5f34f81889aa9eed499608348f149984bc38
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt8
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt3
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt2
4 files changed, 16 insertions, 8 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index b07d64388..d76621c26 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties()
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
var clientId: String? = null
- // strongest producing guarantee
- var acks: String = "all"
- var retries: Int = 0
- // ensure we don't push duplicates
- var enableIdempotence: Boolean = true
+ var acks: String = "all" // strongest producing guarantee
+ var maxBlockMs: Int = 250 // max blocking time in ms to send a message
+ var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
+ var enableIdempotence: Boolean = true // ensure we don't push duplicates
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
configProps[ProducerConfig.ACKS_CONFIG] = acks
+ configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
+ configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
if (clientId != null) {
configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index c659fdb8b..e9bc5d8ad 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks)
- fun retries(retries: Int) = retries(retries.asJsonPrimitive())
+ fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive())
- fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries)
+ fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs)
+
+ fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive())
+
+ fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs)
fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive())
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
index 612a57d23..b1af230b9 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt
@@ -35,7 +35,8 @@ class MessagePropertiesDSLTest {
bootstrapServers("sample-bootstrapServers")
clientId("sample-client-id")
acks("all")
- retries(3)
+ maxBlockMs(0)
+ reconnectBackOffMs(60 * 60 * 1000)
enableIdempotence(true)
topic("sample-topic")
truststore("/path/to/truststore.jks")
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
index da7394998..537dab1ba 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt
@@ -100,6 +100,8 @@ open class BlueprintMessageProducerServiceTest {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
+ ProducerConfig.MAX_BLOCK_MS_CONFIG to 250,
+ ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000,
ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true,
ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(),