diff options
Diffstat (limited to 'sources')
2 files changed, 8 insertions, 11 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt index 8b7ed67f..905c737e 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt @@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.config.impl import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext -import org.onap.dcae.collectors.veshv.utils.rx.delayElements +import org.onap.dcae.collectors.veshv.utils.rx.nextWithVariableInterval import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest @@ -63,15 +63,12 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>, } private fun toPeriodicalConfigurations(cbsClient: CbsClient) = - Mono.just(configurationRequest()) - .repeat() - .map(CbsRequest::withNewInvocationId) - .flatMap(cbsClient::get) - .transform(delayElements(requestInterval::get)) - - private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) + Mono.defer { cbsClient.get(configurationRequest.withNewInvocationId()) } + .repeatWhen { it.nextWithVariableInterval(requestInterval::get) } companion object { private val logger = Logger(CbsClientAdapter::class) + + private val configurationRequest: CbsRequest = CbsRequests.getConfiguration(RequestDiagnosticContext.create()) } } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt index e1886055..d68ca5c1 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt @@ -34,6 +34,6 @@ import java.time.Duration fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> = toMono().then(Mono.fromCallable(callback)) -fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux -> - flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) } -} +fun <T> Flux<T>.nextWithVariableInterval(intervalSupplier: () -> Duration): Flux<T> = + concatMap { Mono.just(it).delayElement(intervalSupplier()) } + |