From d6f5bfa934b9aa0571e853fc5432ab84eceb9db1 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 29 Aug 2018 13:24:59 +0200 Subject: Improve coverage of xNF simulator Also refactor to make it possible. Change-Id: I6da6d3f33e57c524a7e353ecebd3e045d8ceed2a Issue-ID: DCAEGEN2-739 Signed-off-by: Piotr Jaszczyk --- .../xnf/config/ArgXnfSimulatorConfiguration.kt | 82 -------------- .../xnf/config/SimulatorConfiguration.kt | 32 ------ .../veshv/simulators/xnf/impl/HttpServer.kt | 102 ----------------- .../veshv/simulators/xnf/impl/XnfSimulator.kt | 122 ++++++--------------- .../simulators/xnf/impl/adapters/VesHvClient.kt | 115 +++++++++++++++++++ .../simulators/xnf/impl/adapters/XnfApiServer.kt | 95 ++++++++++++++++ .../impl/config/ArgXnfSimulatorConfiguration.kt | 82 ++++++++++++++ .../xnf/impl/config/SimulatorConfiguration.kt | 32 ++++++ .../veshv/simulators/xnf/impl/simulations.kt | 76 +++++++++++++ .../dcae/collectors/veshv/simulators/xnf/main.kt | 15 +-- .../veshv/main/OngoingSimulationsTest.kt | 107 ++++++++++++++++++ .../dcae/collectors/veshv/main/XnfSimulatorTest.kt | 114 +++++++++++++++++++ .../main/config/ArgXnfSimulatorConfiurationTest.kt | 6 +- 13 files changed, 664 insertions(+), 316 deletions(-) delete mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt delete mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt delete mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt create mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt create mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt create mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt create mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt create mode 100644 hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt create mode 100644 hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt create mode 100644 hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt (limited to 'hv-collector-xnf-simulator/src') diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt deleted file mode 100644 index 999d0327..00000000 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgXnfSimulatorConfiguration.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config - -import arrow.core.ForOption -import arrow.core.Option -import arrow.core.fix -import arrow.instances.extensions -import arrow.typeclasses.binding -import org.apache.commons.cli.CommandLine -import org.apache.commons.cli.DefaultParser -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.* - - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration(DefaultParser()) { - override val cmdLineOptionsList = listOf( - VES_HV_PORT, - VES_HV_HOST, - LISTEN_PORT, - SSL_DISABLE, - PRIVATE_KEY_FILE, - CERT_FILE, - TRUST_CERT_FILE - ) - - override fun getConfiguration(cmdLine: CommandLine): Option = - ForOption extensions { - binding { - val listenPort = cmdLine.intValue(LISTEN_PORT).bind() - val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() - val vesPort = cmdLine.intValue(VES_HV_PORT).bind() - - SimulatorConfiguration( - listenPort, - vesHost, - vesPort, - parseSecurityConfig(cmdLine)) - }.fix() - } - - private fun parseSecurityConfig(cmdLine: CommandLine): SecurityConfiguration { - val sslDisable = cmdLine.hasOption(SSL_DISABLE) - val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE) - val certFile = cmdLine.stringValue(CERT_FILE, DefaultValues.CERT_FILE) - val trustCertFile = cmdLine.stringValue(TRUST_CERT_FILE, DefaultValues.TRUST_CERT_FILE) - - return SecurityConfiguration( - sslDisable = sslDisable, - privateKey = stringPathToPath(pkFile), - cert = stringPathToPath(certFile), - trustedCert = stringPathToPath(trustCertFile)) - } - - internal object DefaultValues { - const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" - const val CERT_FILE = "/etc/ves-hv/client.crt" - const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" - } -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt deleted file mode 100644 index 708ffd13..00000000 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/SimulatorConfiguration.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.config - -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -internal data class SimulatorConfiguration( - val listenPort: Int, - val vesHost: String, - val vesPort: Int, - val security: SecurityConfiguration) diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt deleted file mode 100644 index 02e6ee72..00000000 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ /dev/null @@ -1,102 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl - -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser -import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser.Companion.INSTANCE -import ratpack.handling.Chain -import ratpack.handling.Context -import ratpack.server.RatpackServer -import ratpack.server.ServerConfig -import reactor.core.scheduler.Schedulers -import javax.json.Json - -/** - * @author Jakub Dudycz - * @since June 2018 - */ -internal class HttpServer(private val vesClient: XnfSimulator, - private val messageParametersParser: MessageParametersParser = INSTANCE) { - - fun start(port: Int): IO = IO { - RatpackServer.start { server -> - server.serverConfig(ServerConfig.embedded().port(port)) - .handlers(this::configureHandlers) - } - } - - private fun configureHandlers(chain: Chain) { - chain - .post("simulator/sync") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendIo(it) } - .map { it.unsafeRunSync() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .post("simulator/async") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readArray() } - .map { messageParametersParser.parse(it) } - .map { MessageGenerator.INSTANCE.createMessageFlux(it) } - .map { vesClient.sendRx(it) } - .map { it.subscribeOn(Schedulers.elastic()).subscribe() } - .onError { handleException(it, ctx) } - .then { sendAcceptedResponse(ctx) } - } - .get("healthcheck") { ctx -> - ctx.response.status(STATUS_OK).send() - } - } - - private fun sendAcceptedResponse(ctx: Context) { - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - - private fun handleException(t: Throwable, ctx: Context) { - logger.warn("Failed to process the request - ${t.localizedMessage}") - logger.debug("Exception thrown when processing the request", t) - ctx.response - .status(STATUS_BAD_REQUEST) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request was not accepted") - .add("exception", t.localizedMessage) - .build() - .toString()) - } - - companion object { - private val logger = Logger(HttpServer::class) - const val STATUS_OK = 200 - const val STATUS_BAD_REQUEST = 400 - const val CONTENT_TYPE_APPLICATION_JSON = "application/json" - } -} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index e8a474d0..558bd1c1 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -19,98 +19,40 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.core.Either +import arrow.core.Some +import arrow.core.Try +import arrow.core.fix +import arrow.core.flatMap +import arrow.core.monad import arrow.effects.IO -import io.netty.handler.ssl.ClientAuth -import io.netty.handler.ssl.SslContext -import io.netty.handler.ssl.SslContextBuilder -import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage -import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient - +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import java.io.InputStream +import javax.json.Json /** - * @author Jakub Dudycz - * @since June 2018 + * @author Piotr Jaszczyk + * @since August 2018 */ -internal class XnfSimulator(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security)) - } - .build() - - fun sendIo(messages: Flux) = IO { - sendRx(messages).block() - } - - fun sendRx(messages: Flux): Mono { - val complete = ReplayProcessor.create(1) - client - .newHandler { _, output -> handler(complete, messages, output) } - .doOnError { - logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - .subscribe { - logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor, - messages: Flux, - nettyOutbound: NettyOutbound): Publisher { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(MAX_BATCH_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) - .then { - logger.info("Messages have been sent") - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): SslContext = - SslContextBuilder.forClient() - .keyManager(config.cert.toFile(), config.privateKey.toFile()) - .trustManager(config.trustedCert.toFile()) - .sslProvider(SslProvider.OPENSSL) - .clientAuth(ClientAuth.REQUIRE) - .build() - - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } - - companion object { - private val logger = Logger(XnfSimulator::class) - private const val MAX_BATCH_SIZE = 128 - private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE - } +class XnfSimulator( + private val vesClient: VesHvClient, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, + private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + + fun startSimulation(messageParameters: InputStream): Either> = + Either.monad().binding { + val json = parseJsonArray(messageParameters).bind() + val parsed = messageParametersParser.parse(json).bind() + val generatedMessages = messageGenerator.createMessageFlux(parsed) + vesClient.sendIo(generatedMessages) + }.fix() + + private fun parseJsonArray(jsonStream: InputStream) = + Try { + Json.createReader(jsonStream).readArray() + }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt new file mode 100644 index 00000000..22e47d75 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -0,0 +1,115 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import io.netty.handler.ssl.ClientAuth +import io.netty.handler.ssl.SslContext +import io.netty.handler.ssl.SslContextBuilder +import io.netty.handler.ssl.SslProvider +import org.onap.dcae.collectors.veshv.domain.EndOfTransmissionMessage +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor +import reactor.ipc.netty.NettyOutbound +import reactor.ipc.netty.tcp.TcpClient + + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +class VesHvClient(private val configuration: SimulatorConfiguration) { + + private val client: TcpClient = TcpClient.builder() + .options { opts -> + opts.host(configuration.vesHost) + .port(configuration.vesPort) + .sslContext(createSslContext(configuration.security)) + } + .build() + + fun sendIo(messages: Flux) = + sendRx(messages).then(Mono.just(Unit)).asIo() + + private fun sendRx(messages: Flux): Mono { + val complete = ReplayProcessor.create(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } + + private fun handler(complete: ReplayProcessor, + messages: Flux, + nettyOutbound: NettyOutbound): Publisher { + + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) + val frames = messages + .map(encoder::encode) + .window(MAX_BATCH_SIZE) + + return nettyOutbound + .logConnectionClosed() + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(allocator.buffer().writeByte(eotMessageByte.toInt()))) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() + } + + private fun createSslContext(config: SecurityConfiguration): SslContext = + SslContextBuilder.forClient() + .keyManager(config.cert.toFile(), config.privateKey.toFile()) + .trustManager(config.trustedCert.toFile()) + .sslProvider(SslProvider.OPENSSL) + .clientAuth(ClientAuth.REQUIRE) + .build() + + private fun NettyOutbound.logConnectionClosed(): NettyOutbound { + context().onClose { + logger.info { "Connection to ${context().address()} has been closed" } + } + return this + } + + companion object { + private val logger = Logger(VesHvClient::class) + private const val MAX_BATCH_SIZE = 128 + private const val eotMessageByte = EndOfTransmissionMessage.MARKER_BYTE + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt new file mode 100644 index 00000000..54ead6f7 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -0,0 +1,95 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.utils.http.Content +import org.onap.dcae.collectors.veshv.utils.http.ContentType +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.HttpStatus +import org.onap.dcae.collectors.veshv.utils.http.Response +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.TypedData +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import java.util.* +import javax.json.Json + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +internal class XnfApiServer( + private val xnfSimulator: XnfSimulator, + private val ongoingSimulations: OngoingSimulations) { + + fun start(port: Int): IO = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) + } + } + + private fun configureHandlers(chain: Chain) { + chain + .post("simulator", ::startSimulationHandler) + .post("simulator/async", ::startSimulationHandler) + .get("simulator/:id", ::simulatorStatusHandler) + .get("healthcheck") { ctx -> + logger.info("Checking health") + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + private fun startSimulationHandler(ctx: Context) { + logger.info("Starting asynchronous scenario") + ctx.request.body.then { body -> + val id = startSimulation(body) + ctx.response.sendEitherErrorOrResponse(id) + } + } + + private fun startSimulation(body: TypedData): Either { + return xnfSimulator.startSimulation(body.inputStream) + .map(ongoingSimulations::startAsynchronousSimulation) + .map(Responses::acceptedResponse) + } + + private fun simulatorStatusHandler(ctx: Context) { + val id = UUID.fromString(ctx.pathTokens["id"]) + val status = ongoingSimulations.status(id) + val response = Responses.statusResponse(status.toString(), status.message) + ctx.response.sendAndHandleErrors(IO.just(response)) + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt new file mode 100644 index 00000000..56d6212a --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -0,0 +1,82 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import arrow.core.ForOption +import arrow.core.Option +import arrow.core.fix +import arrow.instances.extensions +import arrow.typeclasses.binding +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.* + + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration(DefaultParser()) { + override val cmdLineOptionsList = listOf( + VES_HV_PORT, + VES_HV_HOST, + LISTEN_PORT, + SSL_DISABLE, + PRIVATE_KEY_FILE, + CERT_FILE, + TRUST_CERT_FILE + ) + + override fun getConfiguration(cmdLine: CommandLine): Option = + ForOption extensions { + binding { + val listenPort = cmdLine.intValue(LISTEN_PORT).bind() + val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() + val vesPort = cmdLine.intValue(VES_HV_PORT).bind() + + SimulatorConfiguration( + listenPort, + vesHost, + vesPort, + parseSecurityConfig(cmdLine)) + }.fix() + } + + private fun parseSecurityConfig(cmdLine: CommandLine): SecurityConfiguration { + val sslDisable = cmdLine.hasOption(SSL_DISABLE) + val pkFile = cmdLine.stringValue(PRIVATE_KEY_FILE, DefaultValues.PRIVATE_KEY_FILE) + val certFile = cmdLine.stringValue(CERT_FILE, DefaultValues.CERT_FILE) + val trustCertFile = cmdLine.stringValue(TRUST_CERT_FILE, DefaultValues.TRUST_CERT_FILE) + + return SecurityConfiguration( + sslDisable = sslDisable, + privateKey = stringPathToPath(pkFile), + cert = stringPathToPath(certFile), + trustedCert = stringPathToPath(trustCertFile)) + } + + internal object DefaultValues { + const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" + const val CERT_FILE = "/etc/ves-hv/client.crt" + const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" + } +} diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt new file mode 100644 index 00000000..9b6ef209 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -0,0 +1,32 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration + +/** + * @author Jakub Dudycz + * @since June 2018 + */ +data class SimulatorConfiguration( + val listenPort: Int, + val vesHost: String, + val vesPort: Int, + val security: SecurityConfiguration) diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt new file mode 100644 index 00000000..95bb4897 --- /dev/null +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import arrow.effects.IO +import kotlinx.coroutines.experimental.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk + * @since August 2018 + */ +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { + private val asyncSimulationContext = executor.asCoroutineDispatcher() + private val simulations = ConcurrentHashMap() + + fun startAsynchronousSimulation(simulationIo: IO): UUID { + val id = UUID.randomUUID() + simulations[id] = StatusOngoing + + simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> + result.fold( + { err -> + logger.warn("Error", err) + simulations[id] = StatusFailure(err) + }, + { + logger.info("Finished sending messages") + simulations[id] = StatusSuccess + } + ) + } + return id + } + + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() { + simulations.clear() + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} + +sealed class Status(val message: String) { + override fun toString() = this::class.simpleName ?: "null" +} + +object StatusNotFound : Status("not found") +object StatusOngoing : Status("ongoing") +object StatusSuccess : Status("success") +data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}") diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index fa6d626b..c9e900ac 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync -import org.onap.dcae.collectors.veshv.utils.arrow.void +import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -38,11 +40,10 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" */ fun main(args: Array) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) - .map {config -> - XnfSimulator(config) - .let { HttpServer(it) } + .map { config -> + XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations()) .start(config.listenPort) - .void() + .unit() } .unsafeRunEitherSync( { ex -> diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt new file mode 100644 index 00000000..70d8ba83 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -0,0 +1,107 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.effects.IO +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess +import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import java.time.Duration +import java.util.* +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk + * @since September 2018 + */ +internal class OngoingSimulationsTest : Spek({ + val executor = Executors.newSingleThreadExecutor() + val cut = OngoingSimulations(executor) + + describe("simulations repository") { + given("not existing task task id") { + val id = UUID.randomUUID() + + on("status") { + val result = cut.status(id) + + it("should have 'not found' status") { + assertThat(result).isEqualTo(StatusNotFound) + } + } + } + + given("never ending task") { + val task = IO.async { } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have ongoing status") { + assertThat(cut.status(result)).isEqualTo(StatusOngoing) + } + } + } + + given("failing task") { + val cause = RuntimeException("facepalm") + val task = IO.raiseError(cause) + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have failing status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusFailure(cause)) + } + } + } + } + + given("successful task") { + val task = IO { println("great success!") } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have successful status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusSuccess) + } + } + } + } + + afterGroup { + executor.shutdown() + } + } + + afterEachTest { cut.clear() } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt new file mode 100644 index 00000000..80f39579 --- /dev/null +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Left +import arrow.core.None +import arrow.core.Right +import arrow.effects.IO +import com.nhaarman.mockito_kotlin.any +import com.nhaarman.mockito_kotlin.mock +import com.nhaarman.mockito_kotlin.whenever +import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.PayloadWireFrameMessage +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk + * @since September 2018 + */ +internal class XnfSimulatorTest : Spek({ + lateinit var cut: XnfSimulator + lateinit var vesClient: VesHvClient + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + + beforeEachTest { + vesClient = mock() + messageParametersParser = mock() + messageGenerator = mock() + cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator) + } + + describe("startSimulation") { + it("should fail when empty input stream") { + // given + val emptyInputStream = ByteInputStream() + + // when + val result = cut.startSimulation(emptyInputStream) + + // then + assertThat(result).isLeft() + } + + it("should fail when invalid JSON") { + // given + val invalidJson = "invalid json".byteInputStream() + + // when + val result = cut.startSimulation(invalidJson) + + // then + assertThat(result).isLeft() + } + + it("should fail when JSON syntax is valid but content is invalid") { + // given + val json = "[1,2,3]".byteInputStream() + val cause = ParsingError("epic fail", None) + whenever(messageParametersParser.parse(any())).thenReturn( + Left(cause)) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).left().isEqualTo(cause) + } + + it("should return generated messages") { + // given + val json = "[true]".byteInputStream() + val messageParams = listOf() + val generatedMessages = Flux.empty() + val sendingIo = IO {} + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) + whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).right().isSameAs(sendingIo) + } + } +}) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt index 8749dc5b..69caf727 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgXnfSimulatorConfiurationTest.kt @@ -26,9 +26,9 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration -import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgXnfSimulatorConfiguration.DefaultValues -import org.onap.dcae.collectors.veshv.simulators.xnf.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration.DefaultValues +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingFailure import org.onap.dcae.collectors.veshv.tests.utils.parseExpectingSuccess import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentError -- cgit 1.2.3-korg