aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-main/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-14 12:52:28 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-16 10:09:12 +0200
commit5e93c1ec9d690d7da15b7c0db0052121d8879471 (patch)
tree5bb91c52a28fa66bdb259748ee1f61ff37963760 /hv-collector-main/src
parent8a2552ac94981cfa18cce551066d9ca4ec668558 (diff)
Remove Ratpack dependency for HV-VES health checks
In order to minimize complexity and possibly improve performance (thread count) reactor-netty should be used instead of Ratpack. Also reorganize code to be more consistent and differentiated readiness and liveness endpoints (for future use in K8s Pod definition). As an example I've defined health check probe in docker-compose YAML. Change-Id: I1b5ce3d685e7ae5b0515b2146ae4fa88b3b41186 Issue-ID: DCAEGEN2-705 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-main/src')
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt48
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt39
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt42
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt51
4 files changed, 145 insertions, 35 deletions
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index dc92228f..a84a39a5 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,13 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.main
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.boundary.ServerHandle
-import org.onap.dcae.collectors.veshv.factory.CollectorFactory
-import org.onap.dcae.collectors.veshv.factory.ServerFactory
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthCheckApiServer
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthStateProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import arrow.effects.IO
+import arrow.effects.fix
+import arrow.effects.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
+import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
@@ -38,13 +37,7 @@ private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainK
fun main(args: Array<String>) =
ArgVesHvConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startHealthCheckApiServer)
- .map(::createServer)
- .map {
- it.start()
- .map(::logServerStarted)
- .flatMap(ServerHandle::await)
- }
+ .map(::startAndAwaitServers)
.unsafeRunEitherSync(
{ ex ->
logger.error("Failed to start a server", ex)
@@ -53,24 +46,9 @@ fun main(args: Array<String>) =
{ logger.info("Gentle shutdown") }
)
-private fun createServer(config: ServerConfiguration): Server {
- val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
- val collectorProvider = CollectorFactory(
- AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
- sink,
- MicrometerMetrics()
- ).createVesHvCollectorProvider()
-
- return ServerFactory.createNettyTcpServer(config, collectorProvider)
-}
-
-private fun logServerStarted(handle: ServerHandle): ServerHandle = handle.also {
- logger.info("HighVolume VES Collector is up and listening on ${it.host}:${it.port}")
-}
-
-private fun startHealthCheckApiServer(config: ServerConfiguration): ServerConfiguration = config.apply {
- HealthCheckApiServer(HealthStateProvider.INSTANCE)
- .start(healthCheckApiPort)
- .unsafeRunSync()
- .also { logger.info("Health check api server started on port ${it.bindPort}") }
-}
+private fun startAndAwaitServers(config: ServerConfiguration) =
+ IO.monad().binding {
+ HealthCheckServer.start(config).bind()
+ VesServer.start(config).bind()
+ .await().bind()
+ }.fix()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
new file mode 100644
index 00000000..04fc021d
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object HealthCheckServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+
+ private fun createHealthCheckServer(config: ServerConfiguration) =
+ HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort)
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
new file mode 100644
index 00000000..5c6f1277
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
@@ -0,0 +1,42 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+abstract class ServerStarter {
+ fun start(config: ServerConfiguration): IO<ServerHandle> =
+ startServer(config)
+ .map { logger.info(serverStartedMessage(it)); it }
+
+ protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
+ protected abstract fun serverStartedMessage(handle: ServerHandle): String
+
+ companion object {
+ private val logger = Logger(ServerStarter::class)
+ }
+}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
new file mode 100644
index 00000000..fbf8936f
--- /dev/null
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main.servers
+
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.factory.CollectorFactory
+import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.main.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+object VesServer : ServerStarter() {
+ override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
+
+ private fun createVesServer(config: ServerConfiguration): Server {
+ val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+ val collectorProvider = CollectorFactory(
+ AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
+ sink,
+ MicrometerMetrics()
+ ).createVesHvCollectorProvider()
+
+ return ServerFactory.createNettyTcpServer(config, collectorProvider)
+ }
+
+ override fun serverStartedMessage(handle: ServerHandle) =
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+} \ No newline at end of file