diff options
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src')
8 files changed, 117 insertions, 86 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index ca6d169a..75ff8955 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -43,8 +43,7 @@ import reactor.netty.tcp.TcpClient class VesHvClient(private val configuration: SimulatorConfiguration) { private val client: TcpClient = TcpClient.create() - .host(configuration.vesHost) - .port(configuration.vesPort) + .addressSupplier { configuration.hvVesAddress } .configureSsl() private fun TcpClient.configureSsl() = @@ -61,14 +60,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { .handle { _, output -> handler(complete, messages, output) } .connect() .doOnError { - logger.info { - "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" - } + logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } } .subscribe { - logger.info { - "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}" - } + logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } } return complete.then() } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index a0785620..654f16a6 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -23,10 +23,6 @@ import arrow.core.Either import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.BUSY -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.DETAILED_STATUS_NODE -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfStatus.IDLE -import org.onap.dcae.collectors.veshv.utils.http.HttpConstants import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors @@ -38,8 +34,8 @@ import ratpack.handling.Context import ratpack.http.TypedData import ratpack.server.RatpackServer import ratpack.server.ServerConfig +import java.net.InetSocketAddress import java.util.* -import javax.json.Json /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,9 +45,10 @@ internal class XnfApiServer( private val xnfSimulator: XnfSimulator, private val ongoingSimulations: OngoingSimulations) { - fun start(port: Int): IO<RatpackServer> = IO { + fun start(socketAddress: InetSocketAddress): IO<RatpackServer> = IO { RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) + server.serverConfig(ServerConfig.embedded() + .port(socketAddress.port)) .handlers(this::configureHandlers) } } @@ -61,7 +58,6 @@ internal class XnfApiServer( .post("simulator", ::startSimulationHandler) .post("simulator/async", ::startSimulationHandler) .get("simulator/:id", ::simulatorStatusHandler) - .get("healthcheck", ::healthcheckHandler) } private fun startSimulationHandler(ctx: Context) { @@ -93,21 +89,6 @@ internal class XnfApiServer( ctx.response.sendAndHandleErrors(IO.just(response)) } - private fun healthcheckHandler(ctx: Context) { - val healthCheckDetailedMessage = createHealthCheckDetailedMessage() - val simulatorStatus = HttpConstants.STATUS_OK - logger.info { "Returning simulator status: ${simulatorStatus} ${healthCheckDetailedMessage}" } - ctx.response.status(simulatorStatus).send(healthCheckDetailedMessage) - } - - private fun createHealthCheckDetailedMessage() = - Json.createObjectBuilder() - .add(DETAILED_STATUS_NODE, when { - ongoingSimulations.isAnySimulationPending() -> BUSY - else -> IDLE - }) - .build().toString() - companion object { private val logger = Logger(XnfApiServer::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt new file mode 100644 index 00000000..5e1c979c --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfHealthCheckServer.kt @@ -0,0 +1,53 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + + +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer +import org.onap.dcae.collectors.veshv.healthcheck.ports.PrometheusMetricsProvider +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Mono + + +internal class XnfHealthCheckServer { + fun startServer(config: SimulatorConfiguration) = createHealthCheckServer(config) + .start() + .map { logger.info(serverStartedMessage(it)); it } + + private fun createHealthCheckServer(config: SimulatorConfiguration): HealthCheckApiServer { + val monitoring = object : PrometheusMetricsProvider { + override fun lastStatus(): Mono<String> = Mono.just("not implemented") + } + return HealthCheckApiServer( + HealthState.INSTANCE, + monitoring, + config.healthCheckApiListenAddress) + } + + private fun serverStartedMessage(handle: ServerHandle) = + { "Health check server is up and listening on ${handle.host}:${handle.port}" } + + companion object { + private val logger = Logger(XnfHealthCheckServer::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 0b321362..7885514b 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -28,17 +28,19 @@ import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue +import java.net.InetSocketAddress /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,6 +51,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon VES_HV_PORT, VES_HV_HOST, LISTEN_PORT, + HEALTH_CHECK_API_PORT, MAXIMUM_PAYLOAD_SIZE_BYTES, SSL_DISABLE, KEY_STORE_FILE, @@ -61,14 +64,20 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() val vesPort = cmdLine.intValue(VES_HV_PORT).bind() + val healthCheckApiListenAddress = cmdLine.intValue(HEALTH_CHECK_API_PORT, + DefaultValues.HEALTH_CHECK_API_PORT) val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) SimulatorConfiguration( - listenPort, - vesHost, - vesPort, + InetSocketAddress(listenPort), + InetSocketAddress(healthCheckApiListenAddress), + InetSocketAddress(vesHost, vesPort), maxPayloadSizeBytes, createSecurityConfiguration(cmdLine).bind()) }.fix() + + internal object DefaultValues { + const val HEALTH_CHECK_API_PORT = 6063 + } } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 3395d282..5a0e73c7 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -20,14 +20,15 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ data class SimulatorConfiguration( - val listenPort: Int, - val vesHost: String, - val vesPort: Int, + val listenAddress: InetSocketAddress, + val healthCheckApiListenAddress: InetSocketAddress, + val hvVesAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt index bd58dd9c..fb71b2cd 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.effects.IO import kotlinx.coroutines.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.BUSY +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription.IDLE +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.util.* @@ -32,13 +35,15 @@ import java.util.concurrent.Executors * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool(), + private val healthState: HealthState = HealthState.INSTANCE) { private val asyncSimulationContext = executor.asCoroutineDispatcher() private val simulations = ConcurrentHashMap<UUID, Status>() fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { val id = UUID.randomUUID() simulations[id] = StatusOngoing + updateHealthState() simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> result.fold( @@ -50,20 +55,22 @@ class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { logger.info { "Finished sending messages" } simulations[id] = StatusSuccess } - ) + ).also { updateHealthState() } } return id } - fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + private fun updateHealthState() = healthState.changeState(currentState()) + + private fun currentState() = if (isAnySimulationPending()) BUSY else IDLE - fun isAnySimulationPending() = simulations.any { + internal fun isAnySimulationPending() = simulations.any { status(it.key) is StatusOngoing } - internal fun clear() { - simulations.clear() - } + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() = simulations.clear() companion object { private val logger = Logger(XnfApiServer::class) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt deleted file mode 100644 index a86e3d50..00000000 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/status.kt +++ /dev/null @@ -1,28 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl - -// TODO: probably should be merged with HealthDescription or made similiar to it -internal object XnfStatus { - - const val BUSY = "Busy" - const val IDLE = "Idle" - const val DETAILED_STATUS_NODE = "Detailed status" -} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 91070d35..308c6864 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,17 +19,25 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf +import arrow.effects.IO +import arrow.effects.fix +import arrow.effects.instances.io.monad.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import ratpack.server.RatpackServer private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf" private val logger = Logger(PACKAGE_NAME) @@ -41,15 +49,7 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map { config -> - logger.info { "Using configuration: $config" } - val xnfSimulator = XnfSimulator( - VesHvClient(config), - MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) - XnfApiServer(xnfSimulator, OngoingSimulations()) - .start(config.listenPort) - .unit() - } + .map(::startServers) .unsafeRunEitherSync( { ex -> logger.withError { log("Failed to start a server", ex) } @@ -57,5 +57,18 @@ fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) }, { logger.info { "Started xNF Simulator API server" } + HealthState.INSTANCE.changeState(HealthDescription.IDLE) } ) + +private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = + IO.monad().binding { + logger.info { "Using configuration: $config" } + XnfHealthCheckServer().startServer(config).bind() + val xnfSimulator = XnfSimulator( + VesHvClient(config), + MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenAddress) + .bind() + }.fix() |