summaryrefslogtreecommitdiffstats
path: root/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-main')
-rw-r--r--hv-collector-main/pom.xml10
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt19
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt15
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt8
4 files changed, 43 insertions, 9 deletions
diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml
index e594aeff..0e956288 100644
--- a/hv-collector-main/pom.xml
+++ b/hv-collector-main/pom.xml
@@ -89,11 +89,19 @@
</dependency>
<dependency>
<groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-health-check</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
<artifactId>hv-collector-test-utils</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
-
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ </dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index 7c958b97..26230cd3 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -35,6 +35,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_FIRST_REQUEST_DELAY
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_REQUEST_INTERVAL
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
@@ -44,6 +45,7 @@ import java.time.Duration
internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
override val cmdLineOptionsList = listOf(
+ HEALTH_CHECK_API_PORT,
LISTEN_PORT,
CONSUL_CONFIG_URL,
CONSUL_FIRST_REQUEST_DELAY,
@@ -59,12 +61,17 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
override fun getConfiguration(cmdLine: CommandLine): Option<ServerConfiguration> =
ForOption extensions {
binding {
+ val healthCheckApiPort = cmdLine.intValue(
+ HEALTH_CHECK_API_PORT,
+ DefaultValues.HEALTH_CHECK_API_PORT
+ )
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
ServerConfiguration(
+ healthCheckApiPort = healthCheckApiPort,
listenPort = listenPort,
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
@@ -77,9 +84,14 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
ForOption extensions {
binding {
val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind()
- val firstRequestDelay = cmdLine.longValue(CONSUL_FIRST_REQUEST_DELAY, DefaultValues.CONSUL_FIRST_REQUEST_DELAY)
- val requestInterval = cmdLine.longValue(CONSUL_REQUEST_INTERVAL, DefaultValues.CONSUL_REQUEST_INTERVAL)
-
+ val firstRequestDelay = cmdLine.longValue(
+ CONSUL_FIRST_REQUEST_DELAY,
+ DefaultValues.CONSUL_FIRST_REQUEST_DELAY
+ )
+ val requestInterval = cmdLine.longValue(
+ CONSUL_REQUEST_INTERVAL,
+ DefaultValues.CONSUL_REQUEST_INTERVAL
+ )
ConfigurationProviderParams(
configUrl,
Duration.ofSeconds(firstRequestDelay),
@@ -103,6 +115,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
}
internal object DefaultValues {
+ const val HEALTH_CHECK_API_PORT = 6060
const val CONSUL_FIRST_REQUEST_DELAY = 10L
const val CONSUL_REQUEST_INTERVAL = 5L
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 44d09d45..23d7d2e2 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.healthcheck.http.HealthCheckApiServer
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
@@ -36,6 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK
fun main(args: Array<String>) =
ArgVesHvConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map(::startHealthCheckApiServer)
.map(::createServer)
.map {
it.start()
@@ -50,7 +52,6 @@ fun main(args: Array<String>) =
{ logger.info("Gentle shutdown") }
)
-
private fun createServer(config: ServerConfiguration): Server {
val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
val collectorProvider = CollectorFactory(
@@ -62,7 +63,13 @@ private fun createServer(config: ServerConfiguration): Server {
return ServerFactory.createNettyTcpServer(config, collectorProvider)
}
-private fun logServerStarted(handle: ServerHandle): ServerHandle {
- logger.info("HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}")
- return handle
+private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
+ logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}")
+}
+
+private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
+ HealthCheckApiServer()
+ .start(healthCheckApiPort)
+ .unsafeRunSync()
+ .also { logger.info("Health check api server started on port ${it.bindPort}") }
}
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
index 14f9be0b..26507197 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt
@@ -40,6 +40,7 @@ import java.time.Duration
*/
object ArgVesHvConfigurationTest : Spek({
lateinit var cut: ArgVesHvConfiguration
+ val healthCheckApiPort = "6070"
val configurationUrl = "http://test-address/test"
val firstRequestDelay = "10"
val requestInterval = "5"
@@ -58,6 +59,7 @@ object ArgVesHvConfigurationTest : Spek({
beforeEachTest {
result = cut.parseExpectingSuccess("--ssl-disable",
+ "--health-check-api-port", healthCheckApiPort,
"--listen-port", listenPort,
"--config-url", configurationUrl,
"--first-request-delay", firstRequestDelay,
@@ -67,7 +69,11 @@ object ArgVesHvConfigurationTest : Spek({
"--trust-cert-file", trustCert.toFile().absolutePath)
}
- it("should set proper port") {
+ it("should set proper health check api port") {
+ assertThat(result.healthCheckApiPort).isEqualTo(healthCheckApiPort.toInt())
+ }
+
+ it("should set proper listen port") {
assertThat(result.listenPort).isEqualTo(listenPort.toInt())
}