aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt19
1 files changed, 10 insertions, 9 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index a0c22418..c4d6c87e 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -25,6 +25,7 @@ import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
@@ -40,8 +41,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
+ .doOnNext {
+ if (it.isSuccessful()) {
+ Mono.just(it)
+ } else {
+ logger.warn(it.exception()) { "Failed to send message to Kafka" }
+ Mono.empty<SenderResult<RoutedMessage>>()
+ }
+ }
.map { it.correlationMetadata() }
return if (logger.traceEnabled) {
@@ -61,12 +68,6 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
msg)
}
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
private fun logSentMessage(sentMsg: RoutedMessage) {
logger.trace {
val msgNum = sentMessages.incrementAndGet()
@@ -74,7 +75,7 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun SenderResult<out Any>.isSuccessful() = exception() == null
companion object {
val logger = Logger(KafkaSink::class)