diff options
Diffstat (limited to 'sources/hv-collector-main')
5 files changed, 31 insertions, 44 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 99ec5e1e..bb484cfe 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,8 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC @@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration KAFKA_SERVERS, HEALTH_CHECK_API_PORT, LISTEN_PORT, - CONSUL_CONFIG_URL, - CONSUL_FIRST_REQUEST_DELAY, - CONSUL_REQUEST_INTERVAL, + CONFIGURATION_FIRST_REQUEST_DELAY, + CONFIGURATION_REQUEST_INTERVAL, SSL_DISABLE, KEY_STORE_FILE, KEY_STORE_PASSWORD, @@ -117,17 +115,15 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> = Option.monad().binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY + CONFIGURATION_FIRST_REQUEST_DELAY, + DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY ) val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL + CONFIGURATION_REQUEST_INTERVAL, + DefaultValues.CONFIGURATION_REQUEST_INTERVAL ) ConfigurationProviderParams( - configUrl, Duration.ofSeconds(firstRequestDelay), Duration.ofSeconds(requestInterval) ) @@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration internal object DefaultValues { const val HEALTH_CHECK_API_PORT = 6060 - const val CONSUL_FIRST_REQUEST_DELAY = 10L - const val CONSUL_REQUEST_INTERVAL = 5L + const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L + const val CONFIGURATION_REQUEST_INTERVAL = 5L const val IDLE_TIMEOUT_SEC = 60L const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES val LOG_LEVEL = LogLevel.INFO.name diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index f00b9ee4..d21b490c 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad -import arrow.effects.instances.io.monadError.monadError import arrow.typeclasses.binding import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription @@ -51,23 +50,25 @@ fun main(args: Array<String>) = logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.debug(ServiceContext::mdc) { "Finished" } } + { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() - VesServer.start(config).bind().let { handle -> - registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() - handle.await().bind() - } - }.fix() + val hvVesHandle = VesServer.start(config).bind() -internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO<Unit> = - IO.monadError().binding { - healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).bind() - logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } + registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) + hvVesHandle.await().bind() }.fix() + +internal fun closeServers(vararg handles: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) = { + logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } + healthState.changeState(HealthDescription.SHUTTING_DOWN) + Closeable.closeAll(handles.asIterable()).unsafeRunSync() + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 4e2e6d86..62c24308 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ object VesServer : ServerStarter() { private fun createVesServer(config: ServerConfiguration): Server { val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + AdapterFactory.configurationProvider(config.configurationProviderParams), AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index da2bfb48..6d219106 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" - val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" val requestInterval = "5" val listenPort = "6969" @@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({ assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") } - it("should set proper first consul request delay") { + it("should set proper first request delay") { assertThat(result.configurationProviderParams.firstRequestDelay) .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) } - it("should set proper consul request interval") { + it("should set proper request interval") { assertThat(result.configurationProviderParams.requestInterval) .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) } - it("should set proper config url") { - assertThat(result.configurationProviderParams.configurationUrl) - .isEqualTo(configurationUrl) - } - it("should set proper security configuration") { assertThat(result.securityConfiguration.keys.isEmpty()).isFalse() @@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({ it("should throw exception") { assertThat( cut.parseExpectingFailure( - "--config-url", configurationUrl, "--ssl-disable", "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval @@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index e032f00e..e18b0b10 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -51,7 +51,7 @@ internal object MainTest : Spek({ val healthState: HealthState = mock() on("closeServers") { - closeServers(handle, healthState = healthState).unsafeRunSync() + closeServers(handle, healthState = healthState).invoke() it("should close all handles") { assertThat(closed).isTrue() |