diff options
Diffstat (limited to 'ms/blueprintsprocessor/modules/commons/message-lib/src')
4 files changed, 16 insertions, 8 deletions
diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt index b07d64388..d76621c26 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/BluePrintMessageLibData.kt @@ -51,17 +51,18 @@ abstract class MessageProducerProperties : CommonProperties() open class KafkaBasicAuthMessageProducerProperties : MessageProducerProperties() { var clientId: String? = null - // strongest producing guarantee - var acks: String = "all" - var retries: Int = 0 - // ensure we don't push duplicates - var enableIdempotence: Boolean = true + var acks: String = "all" // strongest producing guarantee + var maxBlockMs: Int = 250 // max blocking time in ms to send a message + var reconnectBackOffMs: Int = 60 * 60 * 1000 // time in ms before retrying connection (1 hour) + var enableIdempotence: Boolean = true // ensure we don't push duplicates override fun getConfig(): HashMap<String, Any> { val configProps = super.getConfig() configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = ByteArraySerializer::class.java configProps[ProducerConfig.ACKS_CONFIG] = acks + configProps[ProducerConfig.MAX_BLOCK_MS_CONFIG] = maxBlockMs + configProps[ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG] = reconnectBackOffMs configProps[ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] = enableIdempotence if (clientId != null) { configProps[ProducerConfig.CLIENT_ID_CONFIG] = clientId!! diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt index c659fdb8b..e9bc5d8ad 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/main/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSL.kt @@ -159,9 +159,13 @@ open class KafkaBasicAuthMessageProducerPropertiesAssignmentBuilder : MessagePro fun acks(acks: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::acks, acks) - fun retries(retries: Int) = retries(retries.asJsonPrimitive()) + fun maxBlockMs(maxBlockMs: Int) = maxBlockMs(maxBlockMs.asJsonPrimitive()) - fun retries(retries: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::retries, retries) + fun maxBlockMs(maxBlockMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::maxBlockMs, maxBlockMs) + + fun reconnectBackOffMs(reconnectBackOffMs: Int) = reconnectBackOffMs(reconnectBackOffMs.asJsonPrimitive()) + + fun reconnectBackOffMs(reconnectBackOffMs: JsonNode) = property(KafkaBasicAuthMessageProducerProperties::reconnectBackOffMs, reconnectBackOffMs) fun enableIdempotence(enableIdempotence: Boolean) = enableIdempotence(enableIdempotence.asJsonPrimitive()) diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt index 612a57d23..b1af230b9 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/MessagePropertiesDSLTest.kt @@ -35,7 +35,8 @@ class MessagePropertiesDSLTest { bootstrapServers("sample-bootstrapServers") clientId("sample-client-id") acks("all") - retries(3) + maxBlockMs(0) + reconnectBackOffMs(60 * 60 * 1000) enableIdempotence(true) topic("sample-topic") truststore("/path/to/truststore.jks") diff --git a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt index da7394998..537dab1ba 100644 --- a/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt +++ b/ms/blueprintsprocessor/modules/commons/message-lib/src/test/kotlin/org/onap/ccsdk/cds/blueprintsprocessor/message/service/BlueprintMessageProducerServiceTest.kt @@ -100,6 +100,8 @@ open class BlueprintMessageProducerServiceTest { ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java, ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to ByteArraySerializer::class.java, ProducerConfig.ACKS_CONFIG to "all", + ProducerConfig.MAX_BLOCK_MS_CONFIG to 250, + ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG to 60 * 60 * 1000, ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG to true, ConsumerConfig.CLIENT_ID_CONFIG to "default-client-id", CommonClientConfigs.SECURITY_PROTOCOL_CONFIG to SecurityProtocol.SASL_SSL.toString(), |