diff options
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt')
-rw-r--r-- | sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt | 33 |
1 files changed, 7 insertions, 26 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 2b29acd9..1c79abd2 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -19,64 +19,45 @@ */ package org.onap.dcae.collectors.veshv.factory -import arrow.core.Option import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.config.api.model.Routing +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext -import org.onap.dcae.collectors.veshv.utils.arrow.getOption import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(private val configuration: ConfigurationProvider, +class CollectorFactory(private val configuration: CollectorConfiguration, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val maxPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { + private val maxPayloadSizeBytes: Int) { fun createVesHvCollectorProvider(): CollectorProvider { - val config = AtomicReference<Routing>() - configuration() - .doOnNext { - logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" } - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } - healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) - } - .subscribe(config::set) return object : CollectorProvider { - override fun invoke(ctx: ClientContext): Option<Collector> = - config.getOption().map { createVesHvCollector(it, ctx) } + override fun invoke(ctx: ClientContext): Collector = + createVesHvCollector(ctx) override fun close() = sinkProvider.close() } } - private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector = + private fun createVesHvCollector(ctx: ClientContext): Collector = VesHvCollector( clientContext = ctx, wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), protobufDecoder = VesDecoder(), - router = Router(routing, sinkProvider, ctx, metrics), + router = Router(configuration.routing, sinkProvider, ctx, metrics), metrics = metrics) companion object { |