aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt14
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt10
5 files changed, 16 insertions, 30 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ff997173..6c256b72 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector
interface Server {
fun start(): IO<ServerHandle>
}
-
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
- abstract fun await(): IO<Unit>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3e652b92..a400ff32 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
+ private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
@@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
.map(this::createVesHvCollector)
.doOnNext {
logger.info("Using updated configuration for new connections")
- healthStateProvider.changeState(HealthState.HEALTHY)
+ healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
logger.error("Failed to acquire configuration from consul")
- healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 81463039..7de28306 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,8 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
- private val healthStateProvider: HealthStateProvider,
+ private val healthState: HealthState,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
- healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
constructor(http: HttpAdapter,
@@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
- HealthStateProvider.INSTANCE,
+ HealthState.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index c28b1510..f858d959 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -24,15 +24,15 @@ import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
import java.time.Duration
import java.util.function.BiFunction
@@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
return this
}
- private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
- override fun shutdown() = IO {
- ctx.shutdown()
- }
-
- override fun await() = IO<Unit> {
- ctx.context.channel().closeFuture().sync()
- }
- }
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f9a9ba60..78858921 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({
describe("Consul configuration provider") {
val httpAdapterMock: HttpAdapter = mock()
- val healthStateProvider = HealthStateProvider.INSTANCE
+ val healthStateProvider = HealthState.INSTANCE
given("valid resource url") {
val validUrl = "http://valid-url/"
@@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
it("should update the health state"){
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
.verifyComplete()
}
}
@@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({
private fun constructConsulConfigProvider(url: String,
httpAdapter: HttpAdapter,
- healthStateProvider: HealthStateProvider,
+ healthState: HealthState,
iterationCount: Long = 1
): ConsulConfigurationProvider {
@@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String,
url,
firstRequestDelay,
requestInterval,
- healthStateProvider,
+ healthState,
retry
)
}