diff options
Diffstat (limited to 'hv-collector-core')
5 files changed, 16 insertions, 30 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ff997173..6c256b72 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector interface Server { fun start(): IO<ServerHandle> } - -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO<Unit> - abstract fun await(): IO<Unit> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3e652b92..a400ff32 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { + private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() @@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, .map(this::createVesHvCollector) .doOnNext { logger.info("Using updated configuration for new connections") - healthStateProvider.changeState(HealthState.HEALTHY) + healthState.changeState(HealthDescription.HEALTHY) } .doOnError { logger.error("Failed to acquire configuration from consul") - healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 81463039..7de28306 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,8 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, - private val healthStateProvider: HealthStateProvider, + private val healthState: HealthState, retrySpec: Retry<Any> ) : ConfigurationProvider { @@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) - healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } constructor(http: HttpAdapter, @@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, params.configurationUrl, params.firstRequestDelay, params.requestInterval, - HealthStateProvider.INSTANCE, + HealthState.INSTANCE, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index c28b1510..f858d959 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -24,15 +24,15 @@ import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer import java.time.Duration import java.util.function.BiFunction @@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, return this } - private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { - override fun shutdown() = IO { - ctx.shutdown() - } - - override fun await() = IO<Unit> { - ctx.context.channel().closeFuture().sync() - } - } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index f9a9ba60..78858921 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({ describe("Consul configuration provider") { val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthStateProvider.INSTANCE + val healthStateProvider = HealthState.INSTANCE given("valid resource url") { val validUrl = "http://valid-url/" @@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({ it("should update the health state"){ StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) .verifyComplete() } } @@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({ private fun constructConsulConfigProvider(url: String, httpAdapter: HttpAdapter, - healthStateProvider: HealthStateProvider, + healthState: HealthState, iterationCount: Long = 1 ): ConsulConfigurationProvider { @@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String, url, firstRequestDelay, requestInterval, - healthStateProvider, + healthState, retry ) } |