aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt13
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt6
2 files changed, 8 insertions, 11 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
index 8b7ed67f..905c737e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.config.impl
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcae.collectors.veshv.utils.rx.nextWithVariableInterval
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
@@ -63,15 +63,12 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
}
private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
- Mono.just(configurationRequest())
- .repeat()
- .map(CbsRequest::withNewInvocationId)
- .flatMap(cbsClient::get)
- .transform(delayElements(requestInterval::get))
-
- private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+ Mono.defer { cbsClient.get(configurationRequest.withNewInvocationId()) }
+ .repeatWhen { it.nextWithVariableInterval(requestInterval::get) }
companion object {
private val logger = Logger(CbsClientAdapter::class)
+
+ private val configurationRequest: CbsRequest = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
index e1886055..d68ca5c1 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
@@ -34,6 +34,6 @@ import java.time.Duration
fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
toMono().then(Mono.fromCallable(callback))
-fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
- flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
-}
+fun <T> Flux<T>.nextWithVariableInterval(intervalSupplier: () -> Duration): Flux<T> =
+ concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+