summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-06-17 13:52:06 +0200
committerFilip Krzywka <filip.krzywka@nokia.com>2019-06-18 07:57:24 +0200
commit3449b38b23fe1952a534ab35c5a23105c13e8262 (patch)
tree72441fa968d8d24bf12812ff285e8fd259e2a6da /sources
parentb11af902698a4e0bbf11b443f5f0fa6a13e1c085 (diff)
Fix request interval
In previous implementation DistinctUntilChangedSubscriber always requested from upstream 256 events, which resulted in immediate 256 requests to CBS. Request amount is not configurable in other way than hard-limiting using `limitRequest`, which limits request amount for single subscriber. (At least in our pipeline) To avoid multiple manual subscribes, this commit changed CbsClientAdapter to use Mono instead of Flux for CbsRequests and repeat this Mono conditionally. Flux inside of repeatWhen is emitting event after each onComplete received from upstream Mono and resubscribes to it if condition is met. This seemed like good place to put our interval mechanism, which is always-pass condition, but condition resolving blocks for variable duration. Change-Id: I04d1e657ec4d82185f6f07422c25c2d2ff23e60d Issue-ID: DCAEGEN2-1557 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt13
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt6
2 files changed, 8 insertions, 11 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
index 8b7ed67f..905c737e 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsClientAdapter.kt
@@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.config.impl
import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
-import org.onap.dcae.collectors.veshv.utils.rx.delayElements
+import org.onap.dcae.collectors.veshv.utils.rx.nextWithVariableInterval
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.CbsRequest
@@ -63,15 +63,12 @@ internal class CbsClientAdapter(private val cbsClientMono: Mono<CbsClient>,
}
private fun toPeriodicalConfigurations(cbsClient: CbsClient) =
- Mono.just(configurationRequest())
- .repeat()
- .map(CbsRequest::withNewInvocationId)
- .flatMap(cbsClient::get)
- .transform(delayElements(requestInterval::get))
-
- private fun configurationRequest() = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
+ Mono.defer { cbsClient.get(configurationRequest.withNewInvocationId()) }
+ .repeatWhen { it.nextWithVariableInterval(requestInterval::get) }
companion object {
private val logger = Logger(CbsClientAdapter::class)
+
+ private val configurationRequest: CbsRequest = CbsRequests.getConfiguration(RequestDiagnosticContext.create())
}
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
index e1886055..d68ca5c1 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/rx/rx.kt
@@ -34,6 +34,6 @@ import java.time.Duration
fun <T> Publisher<T>.then(callback: () -> Unit): Mono<Unit> =
toMono().then(Mono.fromCallable(callback))
-fun <T> delayElements(intervalSupplier: () -> Duration): (Flux<T>) -> Flux<T> = { flux ->
- flux.concatMap { Mono.just(it).delayElement(intervalSupplier()) }
-}
+fun <T> Flux<T>.nextWithVariableInterval(intervalSupplier: () -> Duration): Flux<T> =
+ concatMap { Mono.just(it).delayElement(intervalSupplier()) }
+