aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt8
1 files changed, 3 insertions, 5 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 433e4d57..618b818f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -57,8 +57,6 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
- .handleErrors()
.transform(::route)
.handleErrors()
.doFinally { releaseBuffersMemory() }
@@ -106,14 +104,14 @@ internal class VesHvCollector(
}
}
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
+
private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
logger.handleReactiveStreamError(clientContext, it)
}
- private fun releaseBuffersMemory() = wireChunkDecoder.release()
- .also { logger.debug { "Released buffer memory after handling message stream" } }
-
private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
filterFailedWithLog(logger, clientContext::fullMdc, predicate)