From 1ddd723f22c64dfb8c414fc8573ebe993ed00578 Mon Sep 17 00:00:00 2001 From: kjaniak Date: Wed, 22 May 2019 22:19:49 +0200 Subject: Support CBS request interval reconfiguration Change-Id: Ie8892e33b2f6a58d6076f66e6cc6a2df830dfa48 Issue-ID: DCAEGEN2-1525 Signed-off-by: kjaniak --- .../src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'sources/hv-collector-utils/src/main/kotlin/org') diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt index ceccbcba..e1886055 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt @@ -26,8 +26,14 @@ package org.onap.dcae.collectors.veshv.utils.rx import org.reactivestreams.Publisher +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toMono +import java.time.Duration fun Publisher.then(callback: () -> Unit): Mono = toMono().then(Mono.fromCallable(callback)) + +fun delayElements(intervalSupplier: () -> Duration): (Flux) -> Flux = { flux -> + flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) } +} -- cgit 1.2.3-korg