diff options
25 files changed, 362 insertions, 178 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 7bd84f53..33aedeca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,5 @@ -version: "2" +version: "3.4" services: - zookeeper: image: wurstmeister/zookeeper ports: @@ -29,14 +28,22 @@ services: command: ["-server", "-bootstrap", "-ui-dir", "/ui"] ves-hv-collector: - image: onap/ves-hv-collector + image: nexus3.onap.org:10003/onap/ves-hv-collector:latest # build: # context: hv-collector-main # dockerfile: Dockerfile ports: - "6060:6060" - "6061:6061/tcp" - command: ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"] + command: ["--listen-port", "6061", + "--health-check-api-port", "6060", + "--config-url", "http://consul:8500/v1/kv/veshv-config"] + healthcheck: + test: curl -f http://localhost:6060/health/ready || exit 1 + interval: 10s + timeout: 3s + retries: 3 + start_period: 20s depends_on: - kafka - consul @@ -44,7 +51,7 @@ services: - ./ssl/:/etc/ves-hv/ xnf-simulator: - image: onap/ves-hv-collector-xnf-simulator + image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator # build: # context: hv-collector-xnf-simulator # dockerfile: Dockerfile @@ -57,7 +64,7 @@ services: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: - image: onap/ves-hv-collector-dcae-simulator + image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator # build: # context: hv-collector-dcae-app-simulator # dockerfile: Dockerfile diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ff997173..6c256b72 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector interface Server { fun start(): IO<ServerHandle> } - -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO<Unit> - abstract fun await(): IO<Unit> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3e652b92..a400ff32 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { + private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() @@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, .map(this::createVesHvCollector) .doOnNext { logger.info("Using updated configuration for new connections") - healthStateProvider.changeState(HealthState.HEALTHY) + healthState.changeState(HealthDescription.HEALTHY) } .doOnError { logger.error("Failed to acquire configuration from consul") - healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 81463039..7de28306 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,8 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, - private val healthStateProvider: HealthStateProvider, + private val healthState: HealthState, retrySpec: Retry<Any> ) : ConfigurationProvider { @@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) - healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } constructor(http: HttpAdapter, @@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, params.configurationUrl, params.firstRequestDelay, params.requestInterval, - HealthStateProvider.INSTANCE, + HealthState.INSTANCE, Retry.any<Any>() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index c28b1510..f858d959 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -24,15 +24,15 @@ import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer import java.time.Duration import java.util.function.BiFunction @@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, return this } - private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { - override fun shutdown() = IO { - ctx.shutdown() - } - - override fun await() = IO<Unit> { - ctx.context.channel().closeFuture().sync() - } - } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index f9a9ba60..78858921 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({ describe("Consul configuration provider") { val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthStateProvider.INSTANCE + val healthStateProvider = HealthState.INSTANCE given("valid resource url") { val validUrl = "http://valid-url/" @@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({ it("should update the health state"){ StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) .verifyComplete() } } @@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({ private fun constructConsulConfigProvider(url: String, httpAdapter: HttpAdapter, - healthStateProvider: HealthStateProvider, + healthState: HealthState, iterationCount: Long = 1 ): ConsulConfigurationProvider { @@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String, url, firstRequestDelay, requestInterval, - healthStateProvider, + healthState, retry ) } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e7b7770d..e9b70578 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import reactor.core.publisher.Flux @@ -40,7 +40,7 @@ import java.time.Duration */ class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthStateProvider() + val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT private val metrics = FakeMetrics() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 493517ba..a9f3e9a4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,7 +24,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC @@ -351,7 +351,7 @@ object VesHvSpecification : Spek({ it("should mark the application healthy") { assertThat(sut.healthStateProvider.currentHealth) .describedAs("application health state") - .isEqualTo(HealthState.HEALTHY) + .isEqualTo(HealthDescription.HEALTHY) } } @@ -363,7 +363,7 @@ object VesHvSpecification : Spek({ it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) .describedAs("application health state") - .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt index 230a728e..c25771b7 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt @@ -19,19 +19,19 @@ */ package org.onap.dcae.collectors.veshv.tests.fakes +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import reactor.core.publisher.Flux -class FakeHealthStateProvider : HealthStateProvider { +class FakeHealthState : HealthState { - lateinit var currentHealth: HealthState + lateinit var currentHealth: HealthDescription - override fun changeState(healthState: HealthState) { - currentHealth = healthState + override fun changeState(healthDescription: HealthDescription) { + currentHealth = healthDescription } - override fun invoke(): Flux<HealthState> { + override fun invoke(): Flux<HealthDescription> { throw NotImplementedError() } } diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml index 1e77adb0..09515879 100644 --- a/hv-collector-health-check/pom.xml +++ b/hv-collector-health-check/pom.xml @@ -50,8 +50,8 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>io.ratpack</groupId> - <artifactId>ratpack-core</artifactId> + <groupId>io.projectreactor.ipc</groupId> + <artifactId>reactor-netty</artifactId> </dependency> <dependency> <groupId>io.arrow-kt</groupId> diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt deleted file mode 100644 index b21d1871..00000000 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.healthcheck.api - -import arrow.effects.IO -import ratpack.handling.Chain -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import java.util.concurrent.atomic.AtomicReference - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since August 2018 - */ -class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) { - - private val healthState: AtomicReference<HealthState> = AtomicReference(HealthState.STARTING) - - fun start(port: Int): IO<RatpackServer> = IO { - healthStateProvider().subscribe(healthState::set) - RatpackServer - .start { - it - .serverConfig(ServerConfig.embedded().port(port).development(false)) - .handlers(this::configureHandlers) - } - } - - private fun configureHandlers(chain: Chain) { - chain - .get("healthcheck") { ctx -> - healthState.get().run { - ctx.response.status(responseCode).send(message) - } - } - } -}
\ No newline at end of file diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt index 5cc09ccc..8c69406c 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt @@ -19,21 +19,14 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.api -import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl -import reactor.core.publisher.Flux /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -interface HealthStateProvider { - - operator fun invoke(): Flux<HealthState> - fun changeState(healthState: HealthState) - - companion object { - val INSTANCE: HealthStateProvider by lazy { - HealthStateProviderImpl() - } - } +enum class HealthDescription(val message: String, val status: HealthStatus) { + HEALTHY("Healthy", HealthStatus.UP), + STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE), + RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE), + CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN) } diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt index 3dddf1e7..853cc00f 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt @@ -19,16 +19,21 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.api -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE +import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateImpl +import reactor.core.publisher.Flux /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -enum class HealthState(val message: String, val responseCode: Int) { - HEALTHY("Healthy", OK), - STARTING("Collector is starting", SERVICE_UNAVAILABLE), - WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE), - CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE) +interface HealthState { + + operator fun invoke(): Flux<HealthDescription> + fun changeState(healthDescription: HealthDescription) + + companion object { + val INSTANCE: HealthState by lazy { + HealthStateImpl() + } + } } diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt new file mode 100644 index 00000000..79fc9321 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.api + +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +enum class HealthStatus(val httpResponseStatus: Int) { + UP(OK), + DOWN(SERVICE_UNAVAILABLE), + OUT_OF_SERVICE(SERVICE_UNAVAILABLE), + UNKNOWN(SERVICE_UNAVAILABLE) +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt new file mode 100644 index 00000000..7e9efac7 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.factory + +import arrow.effects.IO +import io.netty.handler.codec.http.HttpResponseStatus +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.ipc.netty.http.server.HttpServer +import reactor.ipc.netty.http.server.HttpServerRequest +import reactor.ipc.netty.http.server.HttpServerResponse +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since August 2018 + */ +class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) { + + private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING) + + fun start(): IO<ServerHandle> = IO { + healthState().subscribe(healthDescription::set) + val ctx = HttpServer.create(port).startRouter { routes -> + routes.get("/health/ready", ::readinessHandler) + routes.get("/health/alive", ::livenessHandler) + } + NettyServerHandle(ctx) + } + + private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) = + healthDescription.get().run { + resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message)) + } + + private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) = + resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet")) + +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt index 5056d2da..c273f0a0 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt @@ -19,8 +19,8 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.impl -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import reactor.core.publisher.Flux import reactor.core.publisher.FluxProcessor import reactor.core.publisher.UnicastProcessor @@ -29,11 +29,11 @@ import reactor.core.publisher.UnicastProcessor * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -internal class HealthStateProviderImpl : HealthStateProvider { +internal class HealthStateImpl : HealthState { - private val healthStateStream: FluxProcessor<HealthState, HealthState> = UnicastProcessor.create() + private val healthDescriptionStream: FluxProcessor<HealthDescription, HealthDescription> = UnicastProcessor.create() - override fun invoke(): Flux<HealthState> = healthStateStream + override fun invoke(): Flux<HealthDescription> = healthDescriptionStream - override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState) + override fun changeState(healthDescription: HealthDescription) = healthDescriptionStream.onNext(healthDescription) } diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt index e9c487bf..e3fced2d 100644 --- a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt +++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt @@ -21,10 +21,9 @@ package org.onap.dcae.collectors.veshv.healthcheck.impl import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import reactor.test.StepVerifier /** @@ -33,20 +32,20 @@ import reactor.test.StepVerifier */ object HealthStateProviderImplTest : Spek({ describe("Health state provider") { - val healthStateProviderImpl = HealthStateProviderImpl() + val healthStateProviderImpl = HealthStateImpl() on("health state update") { - healthStateProviderImpl.changeState(HealthState.HEALTHY) - healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthStateProviderImpl.changeState(HealthDescription.HEALTHY) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) it("should push new health state to the subscriber") { StepVerifier .create(healthStateProviderImpl().take(4)) - .expectNext(HealthState.HEALTHY) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + .expectNext(HealthDescription.HEALTHY) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) .verifyComplete() } } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index 0e956288..af64cedd 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -99,10 +99,6 @@ <scope>test</scope> </dependency> <dependency> - <groupId>io.ratpack</groupId> - <artifactId>ratpack-core</artifactId> - </dependency> - <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> </dependency> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc92228f..a84a39a5 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,13 +19,12 @@ */ package org.onap.dcae.collectors.veshv.main -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle -import org.onap.dcae.collectors.veshv.factory.CollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer +import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync @@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK fun main(args: Array<String>) = ArgVesHvConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startHealthCheckApiServer) - .map(::createServer) - .map { - it.start() - .map(::logServerStarted) - .flatMap(ServerHandle::await) - } + .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> logger.error("Failed to start a server", ex) @@ -53,24 +46,9 @@ fun main(args: Array<String>) = { logger.info("Gentle shutdown") } ) -private fun createServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() - val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, - MicrometerMetrics() - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider) -} - -private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also { - logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}") -} - -private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply { - HealthCheckApiServer(HealthStateProvider.INSTANCE) - .start(healthCheckApiPort) - .unsafeRunSync() - .also { logger.info("Health check api server started on port ${it.bindPort}") } -} +private fun startAndAwaitServers(config: ServerConfiguration) = + IO.monad().binding { + HealthCheckServer.start(config).bind() + VesServer.start(config).bind() + .await().bind() + }.fix() diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt new file mode 100644 index 00000000..04fc021d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +object HealthCheckServer : ServerStarter() { + override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() + + private fun createHealthCheckServer(config: ServerConfiguration) = + HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort) + + override fun serverStartedMessage(handle: ServerHandle) = + "Health check server is up and listening on ${handle.host}:${handle.port}" +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt new file mode 100644 index 00000000..5c6f1277 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +abstract class ServerStarter { + fun start(config: ServerConfiguration): IO<ServerHandle> = + startServer(config) + .map { logger.info(serverStartedMessage(it)); it } + + protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle> + protected abstract fun serverStartedMessage(handle: ServerHandle): String + + companion object { + private val logger = Logger(ServerStarter::class) + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt new file mode 100644 index 00000000..fbf8936f --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +object VesServer : ServerStarter() { + override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start() + + private fun createVesServer(config: ServerConfiguration): Server { + val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() + val collectorProvider = CollectorFactory( + AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + sink, + MicrometerMetrics() + ).createVesHvCollectorProvider() + + return ServerFactory.createNettyTcpServer(config, collectorProvider) + } + + override fun serverStartedMessage(handle: ServerHandle) = + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +}
\ No newline at end of file diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt index d20ffaca..081dd0da 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt @@ -23,7 +23,7 @@ package org.onap.dcae.collectors.veshv.utils.http * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -class Status{ +class Status { companion object { const val OK = 200 const val SERVICE_UNAVAILABLE = 503 diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt new file mode 100644 index 00000000..bb924f27 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import arrow.effects.IO +import reactor.ipc.netty.tcp.BlockingNettyContext + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +abstract class ServerHandle(val host: String, val port: Int) { + abstract fun shutdown(): IO<Unit> + abstract fun await(): IO<Unit> +} + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { + override fun shutdown() = IO { + ctx.shutdown() + } + + override fun await() = IO<Unit> { + ctx.context.channel().closeFuture().sync() + } +} @@ -87,6 +87,7 @@ <docker-image.namespace>onap</docker-image.namespace> <docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name> <docker.http_proxy></docker.http_proxy> + </properties> @@ -121,6 +122,7 @@ <version>${kotlin.version}</version> <configuration> <jvmTarget>1.8</jvmTarget> + <experimentalCoroutines>enable</experimentalCoroutines> </configuration> <executions> <execution> |