summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-14 12:52:28 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-16 10:09:12 +0200
commit5e93c1ec9d690d7da15b7c0db0052121d8879471 (patch)
tree5bb91c52a28fa66bdb259748ee1f61ff37963760
parent8a2552ac94981cfa18cce551066d9ca4ec668558 (diff)
Remove Ratpack dependency for HV-VES health checks
In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--docker-compose.yml19
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt14
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt10
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt6
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt (renamed from hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt)12
-rw-r--r--hv-collector-health-check/pom.xml4
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt54
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt (renamed from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt)17
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt19
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt34
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt60
-rw-r--r--hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt (renamed from hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt)10
-rw-r--r--hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt21
-rw-r--r--hv-collector-main/pom.xml4
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt48
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt39
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt42
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt51
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt2
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt46
-rw-r--r--pom.xml2
25 files changed, 362 insertions, 178 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 7bd84f53..33aedeca 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,6 +1,5 @@
-version: "2"
+version: "3.4"
services:
-
zookeeper:
image: wurstmeister/zookeeper
ports:
@@ -29,14 +28,22 @@ services:
command: ["-server", "-bootstrap", "-ui-dir", "/ui"]
ves-hv-collector:
- image: onap/ves-hv-collector
+ image: nexus3.onap.org:10003/onap/ves-hv-collector:latest
# build:
# context: hv-collector-main
# dockerfile: Dockerfile
ports:
- "6060:6060"
- "6061:6061/tcp"
- command: ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"]
+ command: ["--listen-port", "6061",
+ "--health-check-api-port", "6060",
+ "--config-url", "http://consul:8500/v1/kv/veshv-config"]
+ healthcheck:
+ test: curl -f http://localhost:6060/health/ready || exit 1
+ interval: 10s
+ timeout: 3s
+ retries: 3
+ start_period: 20s
depends_on:
- kafka
- consul
@@ -44,7 +51,7 @@ services:
- ./ssl/:/etc/ves-hv/
xnf-simulator:
- image: onap/ves-hv-collector-xnf-simulator
+ image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator
# build:
# context: hv-collector-xnf-simulator
# dockerfile: Dockerfile
@@ -57,7 +64,7 @@ services:
- ./ssl/:/etc/ves-hv/
dcae-app-simulator:
- image: onap/ves-hv-collector-dcae-simulator
+ image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator
# build:
# context: hv-collector-dcae-app-simulator
# dockerfile: Dockerfile
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ff997173..6c256b72 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector
interface Server {
fun start(): IO<ServerHandle>
}
-
-abstract class ServerHandle(val host: String, val port: Int) {
- abstract fun shutdown(): IO<Unit>
- abstract fun await(): IO<Unit>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 3e652b92..a400ff32 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
@@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) {
+ private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
@@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider,
.map(this::createVesHvCollector)
.doOnNext {
logger.info("Using updated configuration for new connections")
- healthStateProvider.changeState(HealthState.HEALTHY)
+ healthState.changeState(HealthDescription.HEALTHY)
}
.doOnError {
logger.error("Failed to acquire configuration from consul")
- healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
return collector::get
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index 81463039..7de28306 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -20,8 +20,8 @@
package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val url: String,
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
- private val healthStateProvider: HealthStateProvider,
+ private val healthState: HealthState,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
private val retry = retrySpec
.doOnRetry {
logger.warn("Could not get fresh configuration", it.exception())
- healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
}
constructor(http: HttpAdapter,
@@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
params.configurationUrl,
params.firstRequestDelay,
params.requestInterval,
- HealthStateProvider.INSTANCE,
+ HealthState.INSTANCE,
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index c28b1510..f858d959 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -24,15 +24,15 @@ import arrow.effects.IO
import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
import java.time.Duration
import java.util.function.BiFunction
@@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
return this
}
- private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
- override fun shutdown() = IO {
- ctx.shutdown()
- }
-
- override fun await() = IO<Unit> {
- ctx.context.channel().closeFuture().sync()
- }
- }
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index f9a9ba60..78858921 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import reactor.core.publisher.Mono
import reactor.retry.Retry
@@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({
describe("Consul configuration provider") {
val httpAdapterMock: HttpAdapter = mock()
- val healthStateProvider = HealthStateProvider.INSTANCE
+ val healthStateProvider = HealthState.INSTANCE
given("valid resource url") {
val validUrl = "http://valid-url/"
@@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({
it("should update the health state"){
StepVerifier.create(healthStateProvider().take(iterationCount))
.expectNextCount(iterationCount - 1)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
.verifyComplete()
}
}
@@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({
private fun constructConsulConfigProvider(url: String,
httpAdapter: HttpAdapter,
- healthStateProvider: HealthStateProvider,
+ healthState: HealthState,
iterationCount: Long = 1
): ConsulConfigurationProvider {
@@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String,
url,
firstRequestDelay,
requestInterval,
- healthStateProvider,
+ healthState,
retry
)
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e7b7770d..e9b70578 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
-import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider
+import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import reactor.core.publisher.Flux
@@ -40,7 +40,7 @@ import java.time.Duration
*/
class Sut(sink: Sink = StoringSink()) {
val configurationProvider = FakeConfigurationProvider()
- val healthStateProvider = FakeHealthStateProvider()
+ val healthStateProvider = FakeHealthState()
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 493517ba..a9f3e9a4 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,7 +24,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC
@@ -351,7 +351,7 @@ object VesHvSpecification : Spek({
it("should mark the application healthy") {
assertThat(sut.healthStateProvider.currentHealth)
.describedAs("application health state")
- .isEqualTo(HealthState.HEALTHY)
+ .isEqualTo(HealthDescription.HEALTHY)
}
}
@@ -363,7 +363,7 @@ object VesHvSpecification : Spek({
it("should mark the application unhealthy ") {
assertThat(sut.healthStateProvider.currentHealth)
.describedAs("application health state")
- .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
index 230a728e..c25771b7 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt
@@ -19,19 +19,19 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import reactor.core.publisher.Flux
-class FakeHealthStateProvider : HealthStateProvider {
+class FakeHealthState : HealthState {
- lateinit var currentHealth: HealthState
+ lateinit var currentHealth: HealthDescription
- override fun changeState(healthState: HealthState) {
- currentHealth = healthState
+ override fun changeState(healthDescription: HealthDescription) {
+ currentHealth = healthDescription
}
- override fun invoke(): Flux<HealthState> {
+ override fun invoke(): Flux<HealthDescription> {
throw NotImplementedError()
}
}
diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml
index 1e77adb0..09515879 100644
--- a/hv-collector-health-check/pom.xml
+++ b/hv-collector-health-check/pom.xml
@@ -50,8 +50,8 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
deleted file mode 100644
index b21d1871..00000000
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.healthcheck.api
-
-import arrow.effects.IO
-import ratpack.handling.Chain
-import ratpack.server.RatpackServer
-import ratpack.server.ServerConfig
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since August 2018
- */
-class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) {
-
- private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING)
-
- fun start(port: Int): IO<RatpackServer> = IO {
- healthStateProvider().subscribe(healthState::set)
- RatpackServer
- .start {
- it
- .serverConfig(ServerConfig.embedded().port(port).development(false))
- .handlers(this::configureHandlers)
- }
- }
-
- private fun configureHandlers(chain: Chain) {
- chain
- .get("healthcheck") { ctx ->
- healthState.get().run {
- ctx.response.status(responseCode).send(message)
- }
- }
- }
-} \ No newline at end of file
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
index 5cc09ccc..8c69406c 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt
@@ -19,21 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl
-import reactor.core.publisher.Flux
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-interface HealthStateProvider {
-
- operator fun invoke(): Flux<HealthState>
- fun changeState(healthState: HealthState)
-
- companion object {
- val INSTANCE: HealthStateProvider by lazy {
- HealthStateProviderImpl()
- }
- }
+enum class HealthDescription(val message: String, val status: HealthStatus) {
+ HEALTHY("Healthy", HealthStatus.UP),
+ STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE),
+ RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE),
+ CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN)
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
index 3dddf1e7..853cc00f 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt
@@ -19,16 +19,21 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.api
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
-import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateImpl
+import reactor.core.publisher.Flux
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-enum class HealthState(val message: String, val responseCode: Int) {
- HEALTHY("Healthy", OK),
- STARTING("Collector is starting", SERVICE_UNAVAILABLE),
- WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE),
- CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE)
+interface HealthState {
+
+ operator fun invoke(): Flux<HealthDescription>
+ fun changeState(healthDescription: HealthDescription)
+
+ companion object {
+ val INSTANCE: HealthState by lazy {
+ HealthStateImpl()
+ }
+ }
}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
new file mode 100644
index 00000000..79fc9321
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt
@@ -0,0 +1,34 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.api
+
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK
+import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+enum class HealthStatus(val httpResponseStatus: Int) {
+ UP(OK),
+ DOWN(SERVICE_UNAVAILABLE),
+ OUT_OF_SERVICE(SERVICE_UNAVAILABLE),
+ UNKNOWN(SERVICE_UNAVAILABLE)
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
new file mode 100644
index 00000000..7e9efac7
--- /dev/null
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt
@@ -0,0 +1,60 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.healthcheck.factory
+
+import arrow.effects.IO
+import io.netty.handler.codec.http.HttpResponseStatus
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.server.HttpServer
+import reactor.ipc.netty.http.server.HttpServerRequest
+import reactor.ipc.netty.http.server.HttpServerResponse
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since August 2018
+ */
+class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) {
+
+ private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING)
+
+ fun start(): IO<ServerHandle> = IO {
+ healthState().subscribe(healthDescription::set)
+ val ctx = HttpServer.create(port).startRouter { routes ->
+ routes.get("/health/ready", ::readinessHandler)
+ routes.get("/health/alive", ::livenessHandler)
+ }
+ NettyServerHandle(ctx)
+ }
+
+ private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+ healthDescription.get().run {
+ resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message))
+ }
+
+ private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) =
+ resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet"))
+
+}
diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt
index 5056d2da..c273f0a0 100644
--- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt
+++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt
@@ -19,8 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.healthcheck.impl
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxProcessor
import reactor.core.publisher.UnicastProcessor
@@ -29,11 +29,11 @@ import reactor.core.publisher.UnicastProcessor
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-internal class HealthStateProviderImpl : HealthStateProvider {
+internal class HealthStateImpl : HealthState {
- private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create()
+ private val healthDescriptionStream: FluxProcessor<HealthDescription, HealthDescription> = UnicastProcessor.create()
- override fun invoke(): Flux<HealthState> = healthStateStream
+ override fun invoke(): Flux<HealthDescription> = healthDescriptionStream
- override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState)
+ override fun changeState(healthDescription: HealthDescription) = healthDescriptionStream.onNext(healthDescription)
}
diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
index e9c487bf..e3fced2d 100644
--- a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
+++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt
@@ -21,10 +21,9 @@ package org.onap.dcae.collectors.veshv.healthcheck.impl
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import reactor.test.StepVerifier
/**
@@ -33,20 +32,20 @@ import reactor.test.StepVerifier
*/
object HealthStateProviderImplTest : Spek({
describe("Health state provider") {
- val healthStateProviderImpl = HealthStateProviderImpl()
+ val healthStateProviderImpl = HealthStateImpl()
on("health state update") {
- healthStateProviderImpl.changeState(HealthState.HEALTHY)
- healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ healthStateProviderImpl.changeState(HealthDescription.HEALTHY)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
it("should push new health state to the subscriber") {
StepVerifier
.create(healthStateProviderImpl().take(4))
- .expectNext(HealthState.HEALTHY)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION)
- .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND)
+ .expectNext(HealthDescription.HEALTHY)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
.verifyComplete()
}
}
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index 0e956288..af64cedd 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -99,10 +99,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>io.ratpack</groupId>
- <artifactId>ratpack-core</artifactId>
- </dependency>
- <dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
</dependency>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc92228f..a84a39a5 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,13 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.main
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
-import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
+import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
@@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK
fun main(args: Array<String>) =
ArgVesHvConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startHealthCheckApiServer)
- .map(::createServer)
- .map {
- it.start()
- .map(::logServerStarted)
- .flatMap(ServerHandle::await)
- }
+ .map(::startAndAwaitServers)
.unsafeRunEitherSync(
{ ex ->
logger.error("Failed to start a server", ex)
@@ -53,24 +46,9 @@ fun main(args: Array<String>) =
{ logger.info("Gentle shutdown") }
)
-private fun createServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
- val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
- MicrometerMetrics()
- ).createVesHvCollectorProvider()
-
- return ServerFactory.createNettyTcpServer(config, collectorProvider)
-}
-
-private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
- logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}")
-}
-
-private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
- HealthCheckApiServer(HealthStateProvider.INSTANCE)
- .start(healthCheckApiPort)
- .unsafeRunSync()
- .also { logger.info("Health check api server started on port ${it.bindPort}") }
-}
+private fun startAndAwaitServers(config: ServerConfiguration) =
+ IO.monad().binding {
+ HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind()
+ .await().bind()
+ }.fix()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
new file mode 100644
index 00000000..04fc021d
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object HealthCheckServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+
+ private fun createHealthCheckServer(config: ServerConfiguration) =
+ HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
new file mode 100644
index 00000000..5c6f1277
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerStarter {
+ fun start(config: ServerConfiguration): IO<ServerHandle> =
+ startServer(config)
+ .map { logger.info(serverStartedMessage(it)); it }
+
+ protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
+ protected abstract fun serverStartedMessage(handle: ServerHandle): String
+
+ companion object {
+ private val logger = Logger(ServerStarter::class)
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
new file mode 100644
index 00000000..fbf8936f
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object VesServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
+
+ private fun createVesServer(config: ServerConfiguration): Server {
+ val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+ val collectorProvider = CollectorFactory(
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics()
+ ).createVesHvCollectorProvider()
+
+ return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ }
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+} \ No newline at end of file
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
index d20ffaca..081dd0da 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt
@@ -23,7 +23,7 @@ package org.onap.dcae.collectors.veshv.utils.http
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since August 2018
*/
-class Status{
+class Status {
companion object {
const val OK = 200
const val SERVICE_UNAVAILABLE = 503
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
new file mode 100644
index 00000000..bb924f27
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils
+
+import arrow.effects.IO
+import reactor.ipc.netty.tcp.BlockingNettyContext
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerHandle(val host: String, val port: Int) {
+ abstract fun shutdown(): IO<Unit>
+ abstract fun await(): IO<Unit>
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+ override fun shutdown() = IO {
+ ctx.shutdown()
+ }
+
+ override fun await() = IO<Unit> {
+ ctx.context.channel().closeFuture().sync()
+ }
+}
diff --git a/pom.xml b/pom.xml
index 7504d4f2..3f38d520 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,7 @@
<docker-image.namespace>onap</docker-image.namespace>
<docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
<docker.http_proxy></docker.http_proxy>
+
</properties>
@@ -121,6 +122,7 @@
<version>${kotlin.version}</version>
<configuration>
<jvmTarget>1.8</jvmTarget>
+ <experimentalCoroutines>enable</experimentalCoroutines>
</configuration>
<executions>
<execution>