summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt26
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt27
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt4
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt16
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt2
5 files changed, 31 insertions, 44 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 99ec5e1e..bb484cfe 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,9 +27,8 @@ import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_CONFIG_URL
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
-import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_REQUEST_INTERVAL
+import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.CONFIGURATION_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.DUMMY_MODE
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
@@ -64,9 +63,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
KAFKA_SERVERS,
HEALTH_CHECK_API_PORT,
LISTEN_PORT,
- CONSUL_CONFIG_URL,
- CONSUL_FIRST_REQUEST_DELAY,
- CONSUL_REQUEST_INTERVAL,
+ CONFIGURATION_FIRST_REQUEST_DELAY,
+ CONFIGURATION_REQUEST_INTERVAL,
SSL_DISABLE,
KEY_STORE_FILE,
KEY_STORE_PASSWORD,
@@ -117,17 +115,15 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> =
Option.monad().binding {
- val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
val firstRequestDelay = cmdLine.longValue(
- CONSUL_FIRST_REQUEST_DELAY,
- DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ CONFIGURATION_FIRST_REQUEST_DELAY,
+ DefaultValues.CONFIGURATION_FIRST_REQUEST_DELAY
)
val requestInterval = cmdLine.longValue(
- CONSUL_REQUEST_INTERVAL,
- DefaultValues.CONSUL_REQUEST_INTERVAL
+ CONFIGURATION_REQUEST_INTERVAL,
+ DefaultValues.CONFIGURATION_REQUEST_INTERVAL
)
ConfigurationProviderParams(
- configUrl,
Duration.ofSeconds(firstRequestDelay),
Duration.ofSeconds(requestInterval)
)
@@ -145,8 +141,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
internal object DefaultValues {
const val HEALTH_CHECK_API_PORT = 6060
- const val CONSUL_FIRST_REQUEST_DELAY = 10L
- const val CONSUL_REQUEST_INTERVAL = 5L
+ const val CONFIGURATION_FIRST_REQUEST_DELAY = 10L
+ const val CONFIGURATION_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
val LOG_LEVEL = LogLevel.INFO.name
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index f00b9ee4..d21b490c 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,7 +22,6 @@ package org.onap.dcae.collectors.veshv.main
import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monad.monad
-import arrow.effects.instances.io.monadError.monadError
import arrow.typeclasses.binding
import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
@@ -51,23 +50,25 @@ fun main(args: Array<String>) =
logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
ExitFailure(1)
},
- { logger.debug(ServiceContext::mdc) { "Finished" } }
+ { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
)
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
logger.info { "Using configuration: $config" }
+
val healthCheckServerHandle = HealthCheckServer.start(config).bind()
- VesServer.start(config).bind().let { handle ->
- registerShutdownHook(closeServers(handle, healthCheckServerHandle)).bind()
- handle.await().bind()
- }
- }.fix()
+ val hvVesHandle = VesServer.start(config).bind()
-internal fun closeServers(vararg handles: ServerHandle, healthState: HealthState = HealthState.INSTANCE): IO<Unit> =
- IO.monadError().binding {
- healthState.changeState(HealthDescription.SHUTTING_DOWN)
- Closeable.closeAll(handles.asIterable()).bind()
- logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+ registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
+ hvVesHandle.await().bind()
}.fix()
+
+internal fun closeServers(vararg handles: ServerHandle,
+ healthState: HealthState = HealthState.INSTANCE) = {
+ logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
+ healthState.changeState(HealthDescription.SHUTTING_DOWN)
+ Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+ logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
+}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index 4e2e6d86..62c24308 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,7 +37,7 @@ object VesServer : ServerStarter() {
private fun createVesServer(config: ServerConfiguration): Server {
val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ AdapterFactory.configurationProvider(config.configurationProviderParams),
AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
MicrometerMetrics.INSTANCE,
config.maximumPayloadSizeBytes
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index da2bfb48..6d219106 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -42,7 +42,6 @@ object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
val kafkaBootstrapServers = "dmaap-mr-wro:6666,dmaap-mr-gda:6666"
val healthCheckApiPort = "6070"
- val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
val requestInterval = "5"
val listenPort = "6969"
@@ -63,7 +62,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
@@ -95,21 +93,16 @@ object ArgVesHvConfigurationTest : Spek({
assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0")
}
- it("should set proper first consul request delay") {
+ it("should set proper first request delay") {
assertThat(result.configurationProviderParams.firstRequestDelay)
.isEqualTo(Duration.ofSeconds(firstRequestDelay.toLong()))
}
- it("should set proper consul request interval") {
+ it("should set proper request interval") {
assertThat(result.configurationProviderParams.requestInterval)
.isEqualTo(Duration.ofSeconds(requestInterval.toLong()))
}
- it("should set proper config url") {
- assertThat(result.configurationProviderParams.configurationUrl)
- .isEqualTo(configurationUrl)
- }
-
it("should set proper security configuration") {
assertThat(result.securityConfiguration.keys.isEmpty()).isFalse()
@@ -135,7 +128,6 @@ object ArgVesHvConfigurationTest : Spek({
it("should throw exception") {
assertThat(
cut.parseExpectingFailure(
- "--config-url", configurationUrl,
"--ssl-disable",
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval
@@ -164,7 +156,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
@@ -183,7 +174,6 @@ object ArgVesHvConfigurationTest : Spek({
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
- "--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
"--request-interval", requestInterval,
"--key-store", "/tmp/keys.p12",
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index e032f00e..e18b0b10 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -51,7 +51,7 @@ internal object MainTest : Spek({
val healthState: HealthState = mock()
on("closeServers") {
- closeServers(handle, healthState = healthState).unsafeRunSync()
+ closeServers(handle, healthState = healthState).invoke()
it("should close all handles") {
assertThat(closed).isTrue()