summaryrefslogtreecommitdiffstats
path: root/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin
diff options
context:
space:
mode:
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin')
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt11
-rw-r--r--ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt8
2 files changed, 12 insertions, 7 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
index b07d64388..d76621c26 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt
@@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties()
open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() {
var clientId: String? = null
- // strongest producing guarantee
- var acks: String = "all"
- var retries: Int = 0
- // ensure we don't push duplicates
- var enableIdempotence: Boolean = true
+ var acks: String = "all" // strongest producing guarantee
+ var maxBlockMs: Int = 250 // max blocking time in ms to send a message
+ var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour)
+ var enableIdempotence: Boolean = true // ensure we don't push duplicates
override fun getConfig(): HashMap<String, Any> {
val configProps = super.getConfig()
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java
configProps[ProducerConfig.ACKS_CONFIG] = acks
+ configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs
+ configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs
configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence
if (clientId != null) {
configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!!
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
index c659fdb8b..e9bc5d8ad 100644
--- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
+++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt
@@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro
fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks)
- fun retries(retries: Int) = retries(retries.asJsonPrimitive())
+ fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive())
- fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries)
+ fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs)
+
+ fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive())
+
+ fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs)
fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive())