From 5e93c1ec9d690d7da15b7c0db0052121d8879471 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Tue, 14 Aug 2018 12:52:28 +0200 Subject: Remove Ratpack dependency for HV-VES health checks In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk --- docker-compose.yml | 19 ++++--- .../org/onap/dcae/collectors/veshv/boundary/api.kt | 6 +-- .../collectors/veshv/factory/CollectorFactory.kt | 8 +-- .../impl/adapters/ConsulConfigurationProvider.kt | 8 +-- .../collectors/veshv/impl/socket/NettyTcpServer.kt | 14 +---- .../adapters/ConsulConfigurationProviderTest.kt | 10 ++-- .../dcae/collectors/veshv/tests/component/Sut.kt | 4 +- .../veshv/tests/component/VesHvSpecification.kt | 6 +-- .../veshv/tests/fakes/FakeHealthState.kt | 37 +++++++++++++ .../veshv/tests/fakes/FakeHealthStateProvider.kt | 37 ------------- hv-collector-health-check/pom.xml | 4 +- .../veshv/healthcheck/api/HealthCheckApiServer.kt | 54 ------------------- .../veshv/healthcheck/api/HealthDescription.kt | 32 ++++++++++++ .../veshv/healthcheck/api/HealthState.kt | 19 ++++--- .../veshv/healthcheck/api/HealthStateProvider.kt | 39 -------------- .../veshv/healthcheck/api/HealthStatus.kt | 34 ++++++++++++ .../healthcheck/factory/HealthCheckApiServer.kt | 60 ++++++++++++++++++++++ .../veshv/healthcheck/impl/HealthStateImpl.kt | 39 ++++++++++++++ .../healthcheck/impl/HealthStateProviderImpl.kt | 39 -------------- .../impl/HealthStateProviderImplTest.kt | 21 ++++---- hv-collector-main/pom.xml | 4 -- .../org/onap/dcae/collectors/veshv/main/main.kt | 48 +++++------------ .../veshv/main/servers/HealthCheckServer.kt | 39 ++++++++++++++ .../collectors/veshv/main/servers/ServerStarter.kt | 42 +++++++++++++++ .../collectors/veshv/main/servers/VesServer.kt | 51 ++++++++++++++++++ .../dcae/collectors/veshv/utils/http/Status.kt | 2 +- .../dcae/collectors/veshv/utils/server_handle.kt | 46 +++++++++++++++++ pom.xml | 2 + 28 files changed, 454 insertions(+), 270 deletions(-) create mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt delete mode 100644 hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt delete mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt create mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt delete mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt create mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt create mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt create mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt delete mode 100644 hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt create mode 100644 hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt create mode 100644 hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt diff --git a/docker-compose.yml b/docker-compose.yml index 7bd84f53..33aedeca 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,6 +1,5 @@ -version: "2" +version: "3.4" services: - zookeeper: image: wurstmeister/zookeeper ports: @@ -29,14 +28,22 @@ services: command: ["-server", "-bootstrap", "-ui-dir", "/ui"] ves-hv-collector: - image: onap/ves-hv-collector + image: nexus3.onap.org:10003/onap/ves-hv-collector:latest # build: # context: hv-collector-main # dockerfile: Dockerfile ports: - "6060:6060" - "6061:6061/tcp" - command: ["--listen-port", "6061","--config-url", "http://consul:8500/v1/kv/veshv-config"] + command: ["--listen-port", "6061", + "--health-check-api-port", "6060", + "--config-url", "http://consul:8500/v1/kv/veshv-config"] + healthcheck: + test: curl -f http://localhost:6060/health/ready || exit 1 + interval: 10s + timeout: 3s + retries: 3 + start_period: 20s depends_on: - kafka - consul @@ -44,7 +51,7 @@ services: - ./ssl/:/etc/ves-hv/ xnf-simulator: - image: onap/ves-hv-collector-xnf-simulator + image: nexus3.onap.org:10003/onap/ves-hv-collector-xnf-simulator # build: # context: hv-collector-xnf-simulator # dockerfile: Dockerfile @@ -57,7 +64,7 @@ services: - ./ssl/:/etc/ves-hv/ dcae-app-simulator: - image: onap/ves-hv-collector-dcae-simulator + image: nexus3.onap.org:10003/onap/ves-hv-collector-dcae-simulator # build: # context: hv-collector-dcae-app-simulator # dockerfile: Dockerfile diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ff997173..6c256b72 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.boundary import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,8 +35,3 @@ typealias CollectorProvider = () -> Collector interface Server { fun start(): IO } - -abstract class ServerHandle(val host: String, val port: Int) { - abstract fun shutdown(): IO - abstract fun await(): IO -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 3e652b92..a400ff32 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -25,8 +25,8 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector @@ -42,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, - private val healthStateProvider: HealthStateProvider = HealthStateProvider.INSTANCE) { + private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() @@ -50,11 +50,11 @@ class CollectorFactory(val configuration: ConfigurationProvider, .map(this::createVesHvCollector) .doOnNext { logger.info("Using updated configuration for new connections") - healthStateProvider.changeState(HealthState.HEALTHY) + healthState.changeState(HealthDescription.HEALTHY) } .doOnError { logger.error("Failed to acquire configuration from consul") - healthStateProvider.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } .subscribe(collector::set) return collector::get diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index 81463039..7de28306 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -20,8 +20,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -46,7 +46,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val url: String, private val firstRequestDelay: Duration, private val requestInterval: Duration, - private val healthStateProvider: HealthStateProvider, + private val healthState: HealthState, retrySpec: Retry ) : ConfigurationProvider { @@ -55,7 +55,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, private val retry = retrySpec .doOnRetry { logger.warn("Could not get fresh configuration", it.exception()) - healthStateProvider.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) } constructor(http: HttpAdapter, @@ -64,7 +64,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, params.configurationUrl, params.firstRequestDelay, params.requestInterval, - HealthStateProvider.INSTANCE, + HealthState.INSTANCE, Retry.any() .retryMax(MAX_RETRIES) .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index c28b1510..f858d959 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -24,15 +24,15 @@ import arrow.effects.IO import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer import java.time.Duration import java.util.function.BiFunction @@ -94,16 +94,6 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, return this } - private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { - override fun shutdown() = IO { - ctx.shutdown() - } - - override fun await() = IO { - ctx.context.channel().closeFuture().sync() - } - } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index f9a9ba60..78858921 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -28,8 +28,8 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import reactor.core.publisher.Mono import reactor.retry.Retry @@ -47,7 +47,7 @@ internal object ConsulConfigurationProviderTest : Spek({ describe("Consul configuration provider") { val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthStateProvider.INSTANCE + val healthStateProvider = HealthState.INSTANCE given("valid resource url") { val validUrl = "http://valid-url/" @@ -98,7 +98,7 @@ internal object ConsulConfigurationProviderTest : Spek({ it("should update the health state"){ StepVerifier.create(healthStateProvider().take(iterationCount)) .expectNextCount(iterationCount - 1) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) .verifyComplete() } } @@ -109,7 +109,7 @@ internal object ConsulConfigurationProviderTest : Spek({ private fun constructConsulConfigProvider(url: String, httpAdapter: HttpAdapter, - healthStateProvider: HealthStateProvider, + healthState: HealthState, iterationCount: Long = 1 ): ConsulConfigurationProvider { @@ -122,7 +122,7 @@ private fun constructConsulConfigProvider(url: String, url, firstRequestDelay, requestInterval, - healthStateProvider, + healthState, retry ) } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index e7b7770d..e9b70578 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider -import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthStateProvider +import org.onap.dcae.collectors.veshv.tests.fakes.FakeHealthState import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import reactor.core.publisher.Flux @@ -40,7 +40,7 @@ import java.time.Duration */ class Sut(sink: Sink = StoringSink()) { val configurationProvider = FakeConfigurationProvider() - val healthStateProvider = FakeHealthStateProvider() + val healthStateProvider = FakeHealthState() val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT private val metrics = FakeMetrics() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 493517ba..a9f3e9a4 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,7 +24,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.MEASUREMENTS_FOR_VF_SCALING_TOPIC @@ -351,7 +351,7 @@ object VesHvSpecification : Spek({ it("should mark the application healthy") { assertThat(sut.healthStateProvider.currentHealth) .describedAs("application health state") - .isEqualTo(HealthState.HEALTHY) + .isEqualTo(HealthDescription.HEALTHY) } } @@ -363,7 +363,7 @@ object VesHvSpecification : Spek({ it("should mark the application unhealthy ") { assertThat(sut.healthStateProvider.currentHealth) .describedAs("application health state") - .isEqualTo(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + .isEqualTo(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) } } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt new file mode 100644 index 00000000..c25771b7 --- /dev/null +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthState.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.tests.fakes + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import reactor.core.publisher.Flux + +class FakeHealthState : HealthState { + + lateinit var currentHealth: HealthDescription + + override fun changeState(healthDescription: HealthDescription) { + currentHealth = healthDescription + } + + override fun invoke(): Flux { + throw NotImplementedError() + } +} diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt deleted file mode 100644 index 230a728e..00000000 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/FakeHealthStateProvider.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.tests.fakes - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import reactor.core.publisher.Flux - -class FakeHealthStateProvider : HealthStateProvider { - - lateinit var currentHealth: HealthState - - override fun changeState(healthState: HealthState) { - currentHealth = healthState - } - - override fun invoke(): Flux { - throw NotImplementedError() - } -} diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml index 1e77adb0..09515879 100644 --- a/hv-collector-health-check/pom.xml +++ b/hv-collector-health-check/pom.xml @@ -50,8 +50,8 @@ kotlin-stdlib-jdk8 - io.ratpack - ratpack-core + io.projectreactor.ipc + reactor-netty io.arrow-kt diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt deleted file mode 100644 index b21d1871..00000000 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthCheckApiServer.kt +++ /dev/null @@ -1,54 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.healthcheck.api - -import arrow.effects.IO -import ratpack.handling.Chain -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import java.util.concurrent.atomic.AtomicReference - -/** - * @author Jakub Dudycz - * @since August 2018 - */ -class HealthCheckApiServer(private val healthStateProvider: HealthStateProvider) { - - private val healthState: AtomicReference = AtomicReference(HealthState.STARTING) - - fun start(port: Int): IO = IO { - healthStateProvider().subscribe(healthState::set) - RatpackServer - .start { - it - .serverConfig(ServerConfig.embedded().port(port).development(false)) - .handlers(this::configureHandlers) - } - } - - private fun configureHandlers(chain: Chain) { - chain - .get("healthcheck") { ctx -> - healthState.get().run { - ctx.response.status(responseCode).send(message) - } - } - } -} \ No newline at end of file diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt new file mode 100644 index 00000000..8c69406c --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthDescription.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.api + + +/** + * @author Jakub Dudycz + * @since August 2018 + */ +enum class HealthDescription(val message: String, val status: HealthStatus) { + HEALTHY("Healthy", HealthStatus.UP), + STARTING("Collector is starting", HealthStatus.OUT_OF_SERVICE), + RETRYING_FOR_CONSUL_CONFIGURATION("Consul configuration not available. Retrying.", HealthStatus.OUT_OF_SERVICE), + CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", HealthStatus.DOWN) +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt index 3dddf1e7..853cc00f 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthState.kt @@ -19,16 +19,21 @@ */ package org.onap.dcae.collectors.veshv.healthcheck.api -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK -import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE +import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateImpl +import reactor.core.publisher.Flux /** * @author Jakub Dudycz * @since August 2018 */ -enum class HealthState(val message: String, val responseCode: Int) { - HEALTHY("Healthy", OK), - STARTING("Collector is starting", SERVICE_UNAVAILABLE), - WAITING_FOR_CONSUL_CONFIGURATION("Waiting for consul configuration", SERVICE_UNAVAILABLE), - CONSUL_CONFIGURATION_NOT_FOUND("Consul configuration not found", SERVICE_UNAVAILABLE) +interface HealthState { + + operator fun invoke(): Flux + fun changeState(healthDescription: HealthDescription) + + companion object { + val INSTANCE: HealthState by lazy { + HealthStateImpl() + } + } } diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt deleted file mode 100644 index 5cc09ccc..00000000 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStateProvider.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.healthcheck.api - -import org.onap.dcae.collectors.veshv.healthcheck.impl.HealthStateProviderImpl -import reactor.core.publisher.Flux - -/** - * @author Jakub Dudycz - * @since August 2018 - */ -interface HealthStateProvider { - - operator fun invoke(): Flux - fun changeState(healthState: HealthState) - - companion object { - val INSTANCE: HealthStateProvider by lazy { - HealthStateProviderImpl() - } - } -} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt new file mode 100644 index 00000000..79fc9321 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/api/HealthStatus.kt @@ -0,0 +1,34 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.api + +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.OK +import org.onap.dcae.collectors.veshv.utils.http.Status.Companion.SERVICE_UNAVAILABLE + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +enum class HealthStatus(val httpResponseStatus: Int) { + UP(OK), + DOWN(SERVICE_UNAVAILABLE), + OUT_OF_SERVICE(SERVICE_UNAVAILABLE), + UNKNOWN(SERVICE_UNAVAILABLE) +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt new file mode 100644 index 00000000..7e9efac7 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -0,0 +1,60 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.factory + +import arrow.effects.IO +import io.netty.handler.codec.http.HttpResponseStatus +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.utils.NettyServerHandle +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.ipc.netty.http.server.HttpServer +import reactor.ipc.netty.http.server.HttpServerRequest +import reactor.ipc.netty.http.server.HttpServerResponse +import java.util.concurrent.atomic.AtomicReference + +/** + * @author Jakub Dudycz + * @since August 2018 + */ +class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) { + + private val healthDescription: AtomicReference = AtomicReference(HealthDescription.STARTING) + + fun start(): IO = IO { + healthState().subscribe(healthDescription::set) + val ctx = HttpServer.create(port).startRouter { routes -> + routes.get("/health/ready", ::readinessHandler) + routes.get("/health/alive", ::livenessHandler) + } + NettyServerHandle(ctx) + } + + private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) = + healthDescription.get().run { + resp.status(status.httpResponseStatus).sendString(Flux.just(status.toString(), "\n", message)) + } + + private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) = + resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet")) + +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt new file mode 100644 index 00000000..c273f0a0 --- /dev/null +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateImpl.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.healthcheck.impl + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxProcessor +import reactor.core.publisher.UnicastProcessor + +/** + * @author Jakub Dudycz + * @since August 2018 + */ +internal class HealthStateImpl : HealthState { + + private val healthDescriptionStream: FluxProcessor = UnicastProcessor.create() + + override fun invoke(): Flux = healthDescriptionStream + + override fun changeState(healthDescription: HealthDescription) = healthDescriptionStream.onNext(healthDescription) +} diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt deleted file mode 100644 index 5056d2da..00000000 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImpl.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.healthcheck.impl - -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import reactor.core.publisher.Flux -import reactor.core.publisher.FluxProcessor -import reactor.core.publisher.UnicastProcessor - -/** - * @author Jakub Dudycz - * @since August 2018 - */ -internal class HealthStateProviderImpl : HealthStateProvider { - - private val healthStateStream: FluxProcessor = UnicastProcessor.create() - - override fun invoke(): Flux = healthStateStream - - override fun changeState(healthState: HealthState) = healthStateStream.onNext(healthState) -} diff --git a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt index e9c487bf..e3fced2d 100644 --- a/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt +++ b/hv-collector-health-check/src/test/kotlin/org/onap/dcae/collectors/veshv/healthcheck/impl/HealthStateProviderImplTest.kt @@ -21,10 +21,9 @@ package org.onap.dcae.collectors.veshv.healthcheck.impl import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import reactor.test.StepVerifier /** @@ -33,20 +32,20 @@ import reactor.test.StepVerifier */ object HealthStateProviderImplTest : Spek({ describe("Health state provider") { - val healthStateProviderImpl = HealthStateProviderImpl() + val healthStateProviderImpl = HealthStateImpl() on("health state update") { - healthStateProviderImpl.changeState(HealthState.HEALTHY) - healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - healthStateProviderImpl.changeState(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + healthStateProviderImpl.changeState(HealthDescription.HEALTHY) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + healthStateProviderImpl.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) it("should push new health state to the subscriber") { StepVerifier .create(healthStateProviderImpl().take(4)) - .expectNext(HealthState.HEALTHY) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthState.WAITING_FOR_CONSUL_CONFIGURATION) - .expectNext(HealthState.CONSUL_CONFIGURATION_NOT_FOUND) + .expectNext(HealthDescription.HEALTHY) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .expectNext(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) .verifyComplete() } } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index 0e956288..af64cedd 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -98,10 +98,6 @@ ${project.parent.version} test - - io.ratpack - ratpack-core - io.arrow-kt arrow-core diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dc92228f..a84a39a5 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,13 +19,12 @@ */ package org.onap.dcae.collectors.veshv.main -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.boundary.ServerHandle -import org.onap.dcae.collectors.veshv.factory.CollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider -import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer +import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync @@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK fun main(args: Array) = ArgVesHvConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map(::startHealthCheckApiServer) - .map(::createServer) - .map { - it.start() - .map(::logServerStarted) - .flatMap(ServerHandle::await) - } + .map(::startAndAwaitServers) .unsafeRunEitherSync( { ex -> logger.error("Failed to start a server", ex) @@ -53,24 +46,9 @@ fun main(args: Array) = { logger.info("Gentle shutdown") } ) -private fun createServer(config: ServerConfiguration): Server { - val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() - val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), - sink, - MicrometerMetrics() - ).createVesHvCollectorProvider() - - return ServerFactory.createNettyTcpServer(config, collectorProvider) -} - -private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also { - logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}") -} - -private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply { - HealthCheckApiServer(HealthStateProvider.INSTANCE) - .start(healthCheckApiPort) - .unsafeRunSync() - .also { logger.info("Health check api server started on port ${it.bindPort}") } -} +private fun startAndAwaitServers(config: ServerConfiguration) = + IO.monad().binding { + HealthCheckServer.start(config).bind() + VesServer.start(config).bind() + .await().bind() + }.fix() diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt new file mode 100644 index 00000000..04fc021d --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -0,0 +1,39 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +object HealthCheckServer : ServerStarter() { + override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() + + private fun createHealthCheckServer(config: ServerConfiguration) = + HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort) + + override fun serverStartedMessage(handle: ServerHandle) = + "Health check server is up and listening on ${handle.host}:${handle.port}" +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt new file mode 100644 index 00000000..5c6f1277 --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt @@ -0,0 +1,42 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +abstract class ServerStarter { + fun start(config: ServerConfiguration): IO = + startServer(config) + .map { logger.info(serverStartedMessage(it)); it } + + protected abstract fun startServer(config: ServerConfiguration): IO + protected abstract fun serverStartedMessage(handle: ServerHandle): String + + companion object { + private val logger = Logger(ServerStarter::class) + } +} diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt new file mode 100644 index 00000000..fbf8936f --- /dev/null +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main.servers + +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.factory.CollectorFactory +import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.main.MicrometerMetrics +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +object VesServer : ServerStarter() { + override fun startServer(config: ServerConfiguration): IO = createVesServer(config).start() + + private fun createVesServer(config: ServerConfiguration): Server { + val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() + val collectorProvider = CollectorFactory( + AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + sink, + MicrometerMetrics() + ).createVesHvCollectorProvider() + + return ServerFactory.createNettyTcpServer(config, collectorProvider) + } + + override fun serverStartedMessage(handle: ServerHandle) = + "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" +} \ No newline at end of file diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt index d20ffaca..081dd0da 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/http/Status.kt @@ -23,7 +23,7 @@ package org.onap.dcae.collectors.veshv.utils.http * @author Jakub Dudycz * @since August 2018 */ -class Status{ +class Status { companion object { const val OK = 200 const val SERVICE_UNAVAILABLE = 503 diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt new file mode 100644 index 00000000..bb924f27 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils + +import arrow.effects.IO +import reactor.ipc.netty.tcp.BlockingNettyContext + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +abstract class ServerHandle(val host: String, val port: Int) { + abstract fun shutdown(): IO + abstract fun await(): IO +} + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { + override fun shutdown() = IO { + ctx.shutdown() + } + + override fun await() = IO { + ctx.context.channel().closeFuture().sync() + } +} diff --git a/pom.xml b/pom.xml index 7504d4f2..3f38d520 100644 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,7 @@ onap ${project.groupId}.${project.artifactId} + @@ -121,6 +122,7 @@ ${kotlin.version} 1.8 + enable -- cgit 1.2.3-korg