From 1422bed4c1a002e663fd7c2c4c204831e5f7aa9a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Thu, 28 Feb 2019 17:33:02 +0100 Subject: Use CBS by means of SDK in place of Consul - changed IO creation in main to fix error with too early calling changeState method on collector HealthState - increased coverage a little with few tests - corrected coverage-report pom file to include new modules - temporarily changed to 1.1.4-SNAPSHOT version of sdk due to need of new version of CBSLookup Change-Id: Ic73b46cf881ab4fabf52bef0327b09082aa90dc6 Issue-ID: DCAEGEN2-1302 Signed-off-by: Filip Krzywka --- .../collectors/veshv/main/ArgVesHvConfiguration.kt | 26 +++++++++------------ .../org/onap/dcae/collectors/veshv/main/main.kt | 27 +++++++++++----------- .../collectors/veshv/main/servers/VesServer.kt | 4 ++-- .../veshv/main/ArgVesHvConfigurationTest.kt | 16 +++---------- .../onap/dcae/collectors/veshv/main/MainTest.kt | 2 +- 5 files changed, 31 insertions(+), 44 deletions(-) (limited to 'sources/hv-collector-main') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 99ec5e1e..bb484cfe 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,9 +27,8 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY -import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL +import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC @@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration = Option.monad().binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY + CONFIGURATION_FIRST_REQUEST_DELAY, + DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY ) val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL + CONFIGURATION_REQUEST_INTERVAL, + DefaultValues.CONFIGURATION_REQUEST_INTERVAL ) ConfigurationProviderParams( - configUrl, Duration.ofSeconds(firstRequestDelay), Duration.ofSeconds(requestInterval) ) @@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration) = logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) } ExitFailure(1) }, - { logger.debug(ServiceContext::mdc) { "Finished" } } + { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } } ) private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { Logger.setLogLevel(VESHV_PACKAGE, config.logLevel) logger.info { "Using configuration: $config" } + val healthCheckServerHandle = HealthCheckServer.start(config).bind() - VesServer.start(config).bind().let { handle -> - registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind() - handle.await().bind() - } - }.fix() + val hvVesHandle = VesServer.start(config).bind() -internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO = - IO.monadError().binding { - healthState.changeState(HealthDescription.SHUTTING_DOWN) - Closeable.closeAll(handles.asIterable()).bind() - logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } + registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle)) + hvVesHandle.await().bind() }.fix() + +internal fun closeServers(vararg handles: ServerHandle, + healthState: HealthState = HealthState.INSTANCE) = { + logger.debug(ServiceContext::mdc) { "Graceful shutdown started" } + healthState.changeState(HealthDescription.SHUTTING_DOWN) + Closeable.closeAll(handles.asIterable()).unsafeRunSync() + logger.info(ServiceContext::mdc) { "Graceful shutdown completed" } +} diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index 4e2e6d86..62c24308 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -37,7 +37,7 @@ object VesServer : ServerStarter() { private fun createVesServer(config: ServerConfiguration): Server { val collectorProvider = CollectorFactory( - AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), + AdapterFactory.configurationProvider(config.configurationProviderParams), AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration), MicrometerMetrics.INSTANCE, config.maximumPayloadSizeBytes diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index da2bfb48..6d219106 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({ lateinit var cut: ArgVesHvConfiguration val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666" val healthCheckApiPort = "6070" - val configurationUrl = "http://test-address/test" val firstRequestDelay = "10" val requestInterval = "5" val listenPort = "6969" @@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({ assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") } - it("should set proper first consul request delay") { + it("should set proper first request delay") { assertThat(result.configurationProviderParams.firstRequestDelay) .isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong())) } - it("should set proper consul request interval") { + it("should set proper request interval") { assertThat(result.configurationProviderParams.requestInterval) .isEqualTo(Duration.ofSeconds(requestInterval.toLong())) } - it("should set proper config url") { - assertThat(result.configurationProviderParams.configurationUrl) - .isEqualTo(configurationUrl) - } - it("should set proper security configuration") { assertThat(result.securityConfiguration.keys.isEmpty()).isFalse() @@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({ it("should throw exception") { assertThat( cut.parseExpectingFailure( - "--config-url", configurationUrl, "--ssl-disable", "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval @@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", @@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({ "--kafka-bootstrap-servers", kafkaBootstrapServers, "--health-check-api-port", healthCheckApiPort, "--listen-port", listenPort, - "--config-url", configurationUrl, "--first-request-delay", firstRequestDelay, "--request-interval", requestInterval, "--key-store", "/tmp/keys.p12", diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt index e032f00e..e18b0b10 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt @@ -51,7 +51,7 @@ internal object MainTest : Spek({ val healthState: HealthState = mock() on("closeServers") { - closeServers(handle, healthState = healthState).unsafeRunSync() + closeServers(handle, healthState = healthState).invoke() it("should close all handles") { assertThat(closed).isTrue() -- cgit 1.2.3-korg