summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt72
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt27
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt44
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt57
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt6
5 files changed, 98 insertions, 108 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 39fcae21..c8a3c013 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,58 +19,58 @@
*/
package org.onap.dcae.collectors.veshv.main
-import arrow.effects.IO
-import arrow.effects.fix
-import arrow.effects.instances.io.monad.monad
-import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer
import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.Closeable
import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
-import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.neverComplete
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
+import reactor.core.scheduler.Schedulers
+import java.util.concurrent.atomic.AtomicReference
-private const val VESHV_PACKAGE = "org.onap.dcae.collectors.veshv"
-private val logger = Logger("$VESHV_PACKAGE.main")
-private const val PROGRAM_NAME = "java $VESHV_PACKAGE.main.MainKt"
-fun main(args: Array<String>) =
- ConfigurationModule()
- .createConfigurationFromCommandLine(args)
- .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
- .map(::startAndAwaitServers)
- .unsafeRunEitherSync(
- { ex ->
- logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
- ExitFailure(1)
- },
- { logger.debug(ServiceContext::mdc) { "High Volume VES Collector execution finished" } }
- )
+private const val VES_HV_PACKAGE = "org.onap.dcae.collectors.veshv"
+private val logger = Logger("$VES_HV_PACKAGE.main")
-private fun startAndAwaitServers(config: ServerConfiguration) =
- IO.monad().binding {
- Logger.setLogLevel(VESHV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
+private val hvVesServer = AtomicReference<ServerHandle>()
- val healthCheckServerHandle = HealthCheckServer.start(config).bind()
- val hvVesHandle = VesServer.start(config).bind()
+fun main(args: Array<String>) {
+ HealthCheckServer.start()
+ ConfigurationModule()
+ .hvVesConfigurationUpdates(args)
+ .publishOn(Schedulers.single(Schedulers.elastic()))
+ .doOnNext(::startServer)
+ .doOnError(::logServerStartFailed)
+ .neverComplete() // TODO: remove after merging configuration stream with cbs
+ .block()
+}
+
+private fun startServer(config: HvVesConfiguration) {
+ stopRunningServer()
+ Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
+ logger.info { "Using configuration: $config" }
+
+ VesServer.start(config).let {
+ registerShutdownHook { shutdownGracefully(it) }
+ hvVesServer.set(it)
+ }
+}
- registerShutdownHook(closeServers(hvVesHandle, healthCheckServerHandle))
- hvVesHandle.await().bind()
- }.fix()
+private fun stopRunningServer() = hvVesServer.get()?.close()?.unsafeRunSync()
-internal fun closeServers(vararg handles: ServerHandle,
- healthState: HealthState = HealthState.INSTANCE) = {
+internal fun shutdownGracefully(serverHandle: ServerHandle,
+ healthState: HealthState = HealthState.INSTANCE) {
logger.debug(ServiceContext::mdc) { "Graceful shutdown started" }
healthState.changeState(HealthDescription.SHUTTING_DOWN)
- Closeable.closeAll(handles.asIterable()).unsafeRunSync()
+ serverHandle.close().unsafeRunSync()
logger.info(ServiceContext::mdc) { "Graceful shutdown completed" }
}
+
+private fun logServerStartFailed(ex: Throwable) =
+ logger.withError(ServiceContext::mdc) { log("Failed to start a server", ex) }
+
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
index 15472b5e..bc284d08 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt
@@ -19,25 +19,38 @@
*/
package org.onap.dcae.collectors.veshv.main.servers
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.net.InetSocketAddress
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-object HealthCheckServer : ServerStarter() {
- override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start()
+object HealthCheckServer {
- private fun createHealthCheckServer(config: ServerConfiguration) =
+ private const val DEFAULT_HEALTHCHECK_PORT = 6060
+ private val logger = Logger(HealthCheckServer::class)
+
+ fun start(port: Int = DEFAULT_HEALTHCHECK_PORT) =
+ createHealthCheckServer(port)
+ .start()
+ .then(::logServerStarted)
+ .unsafeRunSync()
+
+ private fun createHealthCheckServer(listenPort: Int) =
HealthCheckApiServer(
HealthState.INSTANCE,
MicrometerMetrics.INSTANCE.metricsProvider,
- config.healthCheckApiListenAddress)
+ InetSocketAddress(listenPort))
- override fun serverStartedMessage(handle: ServerHandle) =
- "Health check server is up and listening on ${handle.host}:${handle.port}"
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info(ServiceContext::mdc) {
+ "Health check server is up and listening on ${handle.host}:${handle.port}"
+ }
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
deleted file mode 100644
index 74a66324..00000000
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/ServerStarter.kt
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.main.servers
-
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.arrow.then
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since August 2018
- */
-abstract class ServerStarter {
- fun start(config: ServerConfiguration): IO<ServerHandle> =
- startServer(config)
- .then { logger.info(ServiceContext::mdc) { serverStartedMessage(it) } }
-
- protected abstract fun startServer(config: ServerConfiguration): IO<ServerHandle>
- protected abstract fun serverStartedMessage(handle: ServerHandle): String
-
- companion object {
- private val logger = Logger(ServerStarter::class)
- }
-}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index 0f5e45ec..d15dccef 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -19,33 +19,54 @@
*/
package org.onap.dcae.collectors.veshv.main.servers
-import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
+import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since August 2018
*/
-object VesServer : ServerStarter() {
- override fun startServer(config: ServerConfiguration): IO<ServerHandle> = createVesServer(config).start()
-
- private fun createVesServer(config: ServerConfiguration): Server {
- val collectorProvider = CollectorFactory(
- AdapterFactory.configurationProvider(config.configurationProviderParams),
- AdapterFactory.sinkCreatorFactory(config.dummyMode, config.kafkaConfiguration),
- MicrometerMetrics.INSTANCE,
- config.maximumPayloadSizeBytes
- ).createVesHvCollectorProvider()
-
- return ServerFactory.createNettyTcpServer(config, collectorProvider, MicrometerMetrics.INSTANCE)
- }
-
- override fun serverStartedMessage(handle: ServerHandle) =
- "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+object VesServer {
+
+ private val logger = Logger(VesServer::class)
+
+ fun start(config: HvVesConfiguration): ServerHandle =
+ createVesServer(config)
+ .start()
+ .then(::logServerStarted)
+ .unsafeRunSync()
+
+ private fun createVesServer(config: HvVesConfiguration): Server =
+ initializeCollectorFactory(config)
+ .createVesHvCollectorProvider()
+ .let { collectorProvider ->
+ ServerFactory.createNettyTcpServer(
+ config.server,
+ config.security,
+ collectorProvider,
+ MicrometerMetrics.INSTANCE
+ )
+ }
+
+ private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
+ CollectorFactory(
+ AdapterFactory.configurationProvider(config.cbs),
+ AdapterFactory.sinkCreatorFactory(config.collector),
+ MicrometerMetrics.INSTANCE,
+ config.server.maxPayloadSizeBytes
+ )
+
+ private fun logServerStarted(handle: ServerHandle) =
+ logger.info(ServiceContext::mdc) {
+ "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}"
+ }
+
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
index e18b0b10..d8de9f25 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MainTest.kt
@@ -42,7 +42,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
internal object MainTest : Spek({
describe("closeServer shutdown hook") {
given("server handles and health state") {
- val handle: ServerHandle = mock()
+ val handle = mock<ServerHandle>()
var closed = false
val handleClose = IO {
closed = true
@@ -50,8 +50,8 @@ internal object MainTest : Spek({
whenever(handle.close()).thenReturn(handleClose)
val healthState: HealthState = mock()
- on("closeServers") {
- closeServers(handle, healthState = healthState).invoke()
+ on("shutdownGracefully") {
+ shutdownGracefully(handle, healthState = healthState)
it("should close all handles") {
assertThat(closed).isTrue()