diff options
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
12 files changed, 930 insertions, 0 deletions
diff --git a/sources/hv-collector-xnf-simulator/Dockerfile b/sources/hv-collector-xnf-simulator/Dockerfile new file mode 100644 index 00000000..53e126b2 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/Dockerfile @@ -0,0 +1,18 @@ +FROM docker.io/openjdk:11-jre-slim + +LABEL copyright="Copyright (C) 2018 NOKIA" +LABEL license.name="The Apache Software License, Version 2.0" +LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" +LABEL maintainer="Nokia Wroclaw ONAP Team" + +RUN apt-get update \ + && apt-get install -y --no-install-recommends curl \ + && apt-get clean + +WORKDIR /opt/ves-hv-client-simulator + +ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"] + +COPY target/libs/external/* ./ +COPY target/libs/internal/* ./ +COPY target/hv-collector-xnf-simulator-*.jar ./ diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml new file mode 100644 index 00000000..de262567 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/pom.xml @@ -0,0 +1,157 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2018 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-xnf-simulator</artifactId> + <description>VES HighVolume Collector :: XNF simulator</description> + + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + </plugins> + </build> + <profiles> + <profile> + <id>docker</id> + <activation> + <property> + <name>!skipDocker</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-dependency-plugin</artifactId> + </plugin> + <plugin> + <groupId>io.fabric8</groupId> + <artifactId>docker-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-domain</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ssl</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ves-message-generator</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlinx</groupId> + <artifactId>kotlinx-coroutines-core</artifactId> + </dependency> + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk8</artifactId> + </dependency> + <!-- See comment in main pom + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-transport-native-epoll</artifactId> + <classifier>${os.detected.classifier}</classifier> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <classifier>${os.detected.classifier}</classifier> + </dependency> + --> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>runtime</scope> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + </dependency> + <dependency> + <groupId>io.ratpack</groupId> + <artifactId>ratpack-core</artifactId> + </dependency> + </dependencies> + + +</project>
\ No newline at end of file diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt new file mode 100644 index 00000000..ee4734ae --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -0,0 +1,57 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import arrow.core.Either +import arrow.core.Some +import arrow.core.Try +import arrow.core.fix +import arrow.effects.IO +import arrow.instances.either.monad.monad +import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import java.io.InputStream +import javax.json.Json + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class XnfSimulator( + private val vesClient: VesHvClient, + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = + Either.monad<ParsingError>().binding { + val json = parseJsonArray(messageParameters).bind() + val parsed = messageParametersParser.parse(json).bind() + val generatedMessages = messageGenerator.createMessageFlux(parsed) + vesClient.sendIo(generatedMessages) + }.fix() + + private fun parseJsonArray(jsonStream: InputStream) = + Try { + Json.createReader(jsonStream).readArray() + }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt new file mode 100644 index 00000000..57aaf3db --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -0,0 +1,109 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import arrow.core.Option +import arrow.core.getOrElse +import io.netty.handler.ssl.SslContext +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.ClientSslContextFactory +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.reactivestreams.Publisher +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpClient + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class VesHvClient(private val configuration: SimulatorConfiguration) { + + private val client: TcpClient = TcpClient.create() + .host(configuration.vesHost) + .port(configuration.vesPort) + .configureSsl() + + private fun TcpClient.configureSsl() = + createSslContext(configuration.security) + .map { sslContext -> this.secure(sslContext) } + .getOrElse { this } + + fun sendIo(messages: Flux<WireFrameMessage>) = + sendRx(messages).then(Mono.just(Unit)).asIo() + + private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { + val complete = ReplayProcessor.create<Void>(1) + client + .handle { _, output -> handler(complete, messages, output) } + .connect() + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } + + private fun handler(complete: ReplayProcessor<Void>, + messages: Flux<WireFrameMessage>, + nettyOutbound: NettyOutbound): Publisher<Void> { + + val allocator = nettyOutbound.alloc() + val encoder = WireFrameEncoder(allocator) + val frames = messages + .map(encoder::encode) + .window(MAX_BATCH_SIZE) + + return nettyOutbound + .logConnectionClosed() + .options { it.flushOnBoundary() } + .sendGroups(frames) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() + } + + private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = + ClientSslContextFactory().createSslContext(config) + + private fun NettyOutbound.logConnectionClosed() = + withConnection { conn -> + conn.onTerminate().subscribe { + logger.info { "Connection to ${conn.address()} has been closed" } + } + } + + companion object { + private val logger = Logger(VesHvClient::class) + private const val MAX_BATCH_SIZE = 128 + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt new file mode 100644 index 00000000..06f1cffe --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -0,0 +1,90 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import arrow.core.Either +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.utils.http.HttpConstants +import org.onap.dcae.collectors.veshv.utils.http.Response +import org.onap.dcae.collectors.veshv.utils.http.Responses +import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors +import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import ratpack.handling.Chain +import ratpack.handling.Context +import ratpack.http.TypedData +import ratpack.server.RatpackServer +import ratpack.server.ServerConfig +import java.util.* + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +internal class XnfApiServer( + private val xnfSimulator: XnfSimulator, + private val ongoingSimulations: OngoingSimulations) { + + fun start(port: Int): IO<RatpackServer> = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) + } + } + + private fun configureHandlers(chain: Chain) { + chain + .post("simulator", ::startSimulationHandler) + .post("simulator/async", ::startSimulationHandler) + .get("simulator/:id", ::simulatorStatusHandler) + .get("healthcheck") { ctx -> + logger.info("Checking health") + ctx.response.status(HttpConstants.STATUS_OK).send() + } + } + + private fun startSimulationHandler(ctx: Context) { + logger.info("Starting asynchronous scenario") + ctx.request.body.then { body -> + val id = startSimulation(body) + ctx.response.sendEitherErrorOrResponse(id) + } + } + + private fun startSimulation(body: TypedData): Either<ParsingError, Response> { + return xnfSimulator.startSimulation(body.inputStream) + .map(ongoingSimulations::startAsynchronousSimulation) + .map(Responses::acceptedResponse) + } + + private fun simulatorStatusHandler(ctx: Context) { + val id = UUID.fromString(ctx.pathTokens["id"]) + val status = ongoingSimulations.status(id) + val response = Responses.statusResponse(status.toString(), status.message) + ctx.response.sendAndHandleErrors(IO.just(response)) + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt new file mode 100644 index 00000000..0b321362 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import arrow.core.Option +import arrow.core.fix +import arrow.instances.option.monad.monad +import arrow.typeclasses.binding +import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.intValue +import org.onap.dcae.collectors.veshv.utils.commandline.stringValue + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorConfiguration>(DefaultParser()) { + override val cmdLineOptionsList = listOf( + VES_HV_PORT, + VES_HV_HOST, + LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, + SSL_DISABLE, + KEY_STORE_FILE, + KEY_STORE_PASSWORD, + TRUST_STORE_FILE, + TRUST_STORE_PASSWORD) + + override fun getConfiguration(cmdLine: CommandLine): Option<SimulatorConfiguration> = + Option.monad().binding { + val listenPort = cmdLine.intValue(LISTEN_PORT).bind() + val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() + val vesPort = cmdLine.intValue(VES_HV_PORT).bind() + val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, + WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + + SimulatorConfiguration( + listenPort, + vesHost, + vesPort, + maxPayloadSizeBytes, + createSecurityConfiguration(cmdLine).bind()) + }.fix() +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt new file mode 100644 index 00000000..3395d282 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +data class SimulatorConfiguration( + val listenPort: Int, + val vesHost: String, + val vesPort: Int, + val maxPayloadSizeBytes: Int, + val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt new file mode 100644 index 00000000..21748ae8 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt @@ -0,0 +1,76 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl + +import arrow.effects.IO +import kotlinx.coroutines.asCoroutineDispatcher +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import java.util.* +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executor +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since August 2018 + */ +class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) { + private val asyncSimulationContext = executor.asCoroutineDispatcher() + private val simulations = ConcurrentHashMap<UUID, Status>() + + fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID { + val id = UUID.randomUUID() + simulations[id] = StatusOngoing + + simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result -> + result.fold( + { err -> + logger.warn("Error", err) + simulations[id] = StatusFailure(err) + }, + { + logger.info("Finished sending messages") + simulations[id] = StatusSuccess + } + ) + } + return id + } + + fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound) + + internal fun clear() { + simulations.clear() + } + + companion object { + private val logger = Logger(XnfApiServer::class) + } +} + +sealed class Status(val message: String) { + override fun toString() = this::class.simpleName ?: "null" +} + +object StatusNotFound : Status("not found") +object StatusOngoing : Status("ongoing") +object StatusSuccess : Status("success") +data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}") diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt new file mode 100644 index 00000000..4512dfbf --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -0,0 +1,61 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf + +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure +import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync +import org.onap.dcae.collectors.veshv.utils.arrow.unit +import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory + +private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf" +private val logger = Logger(PACKAGE_NAME) +const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) + .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) + .map { config -> + logger.info("Using configuration: $config") + val xnfSimulator = XnfSimulator( + VesHvClient(config), + MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + XnfApiServer(xnfSimulator, OngoingSimulations()) + .start(config.listenPort) + .unit() + } + .unsafeRunEitherSync( + { ex -> + logger.error("Failed to start a server", ex) + ExitFailure(1) + }, + { + logger.info("Started xNF Simulator API server") + } + ) diff --git a/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml new file mode 100644 index 00000000..809f62d4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration>
\ No newline at end of file diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt new file mode 100644 index 00000000..a04da7bf --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt @@ -0,0 +1,106 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.effects.IO +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess +import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds +import java.util.* +import java.util.concurrent.Executors + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class OngoingSimulationsTest : Spek({ + val executor = Executors.newSingleThreadExecutor() + val cut = OngoingSimulations(executor) + + describe("simulations repository") { + given("not existing task task id") { + val id = UUID.randomUUID() + + on("status") { + val result = cut.status(id) + + it("should have 'not found' status") { + assertThat(result).isEqualTo(StatusNotFound) + } + } + } + + given("never ending task") { + val task = IO.async<Unit> { } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have ongoing status") { + assertThat(cut.status(result)).isEqualTo(StatusOngoing) + } + } + } + + given("failing task") { + val cause = RuntimeException("facepalm") + val task = IO.raiseError<Unit>(cause) + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have failing status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusFailure(cause)) + } + } + } + } + + given("successful task") { + val task = IO { println("great success!") } + + on("startAsynchronousSimulation") { + val result = cut.startAsynchronousSimulation(task) + + it("should have successful status") { + waitUntilSucceeds { + assertThat(cut.status(result)).isEqualTo(StatusSuccess) + } + } + } + } + + afterGroup { + executor.shutdown() + } + } + + afterEachTest { cut.clear() } +}) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt new file mode 100644 index 00000000..95510e77 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -0,0 +1,114 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.main + +import arrow.core.Left +import arrow.core.None +import arrow.core.Right +import arrow.effects.IO +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient +import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import reactor.core.publisher.Flux +import java.io.ByteArrayInputStream + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since September 2018 + */ +internal class XnfSimulatorTest : Spek({ + lateinit var cut: XnfSimulator + lateinit var vesClient: VesHvClient + lateinit var messageParametersParser: MessageParametersParser + lateinit var messageGenerator: MessageGenerator + + beforeEachTest { + vesClient = mock() + messageParametersParser = mock() + messageGenerator = mock() + cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser) + } + + describe("startSimulation") { + it("should fail when empty input stream") { + // given + val emptyInputStream = ByteArrayInputStream(byteArrayOf()) + + // when + val result = cut.startSimulation(emptyInputStream) + + // then + assertThat(result).isLeft() + } + + it("should fail when invalid JSON") { + // given + val invalidJson = "invalid json".byteInputStream() + + // when + val result = cut.startSimulation(invalidJson) + + // then + assertThat(result).isLeft() + } + + it("should fail when JSON syntax is valid but content is invalid") { + // given + val json = "[1,2,3]".byteInputStream() + val cause = ParsingError("epic fail", None) + whenever(messageParametersParser.parse(any())).thenReturn( + Left(cause)) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).left().isEqualTo(cause) + } + + it("should return generated messages") { + // given + val json = "[true]".byteInputStream() + val messageParams = listOf<MessageParameters>() + val generatedMessages = Flux.empty<WireFrameMessage>() + val sendingIo = IO {} + whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams)) + whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages) + whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo) + + // when + val result = cut.startSimulation(json) + + // then + assertThat(result).right().isSameAs(sendingIo) + } + } +}) |