summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-14 12:52:28 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-16 10:09:12 +0200
commit5e93c1ec9d690d7da15b7c0db0052121d8879471 (patch)
tree5bb91c52a28fa66bdb259748ee1f61ff37963760 /hv-collector-core
parent8a2552ac94981cfa18cce551066d9ca4ec668558 (diff)
Remove Ratpack dependency for HV-VES health checks
In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt14
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt10
5 files changed, 16 insertions, 30 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ff997173..6c256b72 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector
interface Server {
fun start(): IO<ServerHandle>
}
-
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
- abstract fun await(): IO<Unit>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3e652b92..a400ff32 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
+ private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
@@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
.map(this::createVesHvCollector)
.doOnNext {
logger.info("Using updated configuration for new connections")
- healthStateProvider.changeState(HealthState.HEALTHY)
+ healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
logger.error("Failed to acquire configuration from consul")
- healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 81463039..7de28306 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,8 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
- private val healthStateProvider: HealthStateProvider,
+ private val healthState: HealthState,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
- healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
constructor(http: HttpAdapter,
@@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
- HealthStateProvider.INSTANCE,
+ HealthState.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index c28b1510..f858d959 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -24,15 +24,15 @@ import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
import java.time.Duration
import java.util.function.BiFunction
@@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
return this
}
- private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
- override fun shutdown() = IO {
- ctx.shutdown()
- }
-
- override fun await() = IO<Unit> {
- ctx.context.channel().closeFuture().sync()
- }
- }
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f9a9ba60..78858921 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({
describe("Consul configuration provider") {
val httpAdapterMock: HttpAdapter = mock()
- val healthStateProvider = HealthStateProvider.INSTANCE
+ val healthStateProvider = HealthState.INSTANCE
given("valid resource url") {
val validUrl = "http://valid-url/"
@@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
it("should update the health state"){
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
.verifyComplete()
}
}
@@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({
private fun constructConsulConfigProvider(url: String,
httpAdapter: HttpAdapter,
- healthStateProvider: HealthStateProvider,
+ healthState: HealthState,
iterationCount: Long = 1
): ConsulConfigurationProvider {
@@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String,
url,
firstRequestDelay,
requestInterval,
- healthStateProvider,
+ healthState,
retry
)
}