summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
-rw-r--r--sources/hv-collector-xnf-simulator/Dockerfile18
-rw-r--r--sources/hv-collector-xnf-simulator/pom.xml157
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt57
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt109
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt90
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt74
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt33
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt76
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt61
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/resources/logback.xml35
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt106
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt114
12 files changed, 930 insertions, 0 deletions
diff --git a/sources/hv-collector-xnf-simulator/Dockerfile b/sources/hv-collector-xnf-simulator/Dockerfile
new file mode 100644
index 00000000..53e126b2
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/Dockerfile
@@ -0,0 +1,18 @@
+FROM docker.io/openjdk:11-jre-slim
+
+LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL license.name="The Apache Software License, Version 2.0"
+LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
+LABEL maintainer="Nokia Wroclaw ONAP Team"
+
+RUN apt-get update \
+ && apt-get install -y --no-install-recommends curl \
+ && apt-get clean
+
+WORKDIR /opt/ves-hv-client-simulator
+
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
+
+COPY target/libs/external/* ./
+COPY target/libs/internal/* ./
+COPY target/hv-collector-xnf-simulator-*.jar ./
diff --git a/sources/hv-collector-xnf-simulator/pom.xml b/sources/hv-collector-xnf-simulator/pom.xml
new file mode 100644
index 00000000..de262567
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/pom.xml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-xnf-simulator</artifactId>
+ <description>VES HighVolume Collector :: XNF simulator</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ </plugins>
+ </build>
+ <profiles>
+ <profile>
+ <id>docker</id>
+ <activation>
+ <property>
+ <name>!skipDocker</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ssl</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ves-message-generator</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlinx</groupId>
+ <artifactId>kotlinx-coroutines-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-cli</groupId>
+ <artifactId>commons-cli</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk8</artifactId>
+ </dependency>
+ <!-- See comment in main pom
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-transport-native-epoll</artifactId>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ -->
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.ratpack</groupId>
+ <artifactId>ratpack-core</artifactId>
+ </dependency>
+ </dependencies>
+
+
+</project> \ No newline at end of file
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
new file mode 100644
index 00000000..ee4734ae
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -0,0 +1,57 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.core.Either
+import arrow.core.Some
+import arrow.core.Try
+import arrow.core.fix
+import arrow.effects.IO
+import arrow.instances.either.monad.monad
+import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import java.io.InputStream
+import javax.json.Json
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class XnfSimulator(
+ private val vesClient: VesHvClient,
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+
+ fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
+ Either.monad<ParsingError>().binding {
+ val json = parseJsonArray(messageParameters).bind()
+ val parsed = messageParametersParser.parse(json).bind()
+ val generatedMessages = messageGenerator.createMessageFlux(parsed)
+ vesClient.sendIo(generatedMessages)
+ }.fix()
+
+ private fun parseJsonArray(jsonStream: InputStream) =
+ Try {
+ Json.createReader(jsonStream).readArray()
+ }.toEither().mapLeft { ParsingError("failed to parse JSON", Some(it)) }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
new file mode 100644
index 00000000..57aaf3db
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -0,0 +1,109 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Option
+import arrow.core.getOrElse
+import io.netty.handler.ssl.SslContext
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.ClientSslContextFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class VesHvClient(private val configuration: SimulatorConfiguration) {
+
+ private val client: TcpClient = TcpClient.create()
+ .host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .configureSsl()
+
+ private fun TcpClient.configureSsl() =
+ createSslContext(configuration.security)
+ .map { sslContext -> this.secure(sslContext) }
+ .getOrElse { this }
+
+ fun sendIo(messages: Flux<WireFrameMessage>) =
+ sendRx(messages).then(Mono.just(Unit)).asIo()
+
+ private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .handle { _, output -> handler(complete, messages, output) }
+ .connect()
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
+
+ private fun handler(complete: ReplayProcessor<Void>,
+ messages: Flux<WireFrameMessage>,
+ nettyOutbound: NettyOutbound): Publisher<Void> {
+
+ val allocator = nettyOutbound.alloc()
+ val encoder = WireFrameEncoder(allocator)
+ val frames = messages
+ .map(encoder::encode)
+ .window(MAX_BATCH_SIZE)
+
+ return nettyOutbound
+ .logConnectionClosed()
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
+ }
+
+ private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
+ ClientSslContextFactory().createSslContext(config)
+
+ private fun NettyOutbound.logConnectionClosed() =
+ withConnection { conn ->
+ conn.onTerminate().subscribe {
+ logger.info { "Connection to ${conn.address()} has been closed" }
+ }
+ }
+
+ companion object {
+ private val logger = Logger(VesHvClient::class)
+ private const val MAX_BATCH_SIZE = 128
+ }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
new file mode 100644
index 00000000..06f1cffe
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -0,0 +1,90 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import arrow.core.Either
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
+import org.onap.dcae.collectors.veshv.utils.http.Response
+import org.onap.dcae.collectors.veshv.utils.http.Responses
+import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
+import org.onap.dcae.collectors.veshv.utils.http.sendEitherErrorOrResponse
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import ratpack.handling.Chain
+import ratpack.handling.Context
+import ratpack.http.TypedData
+import ratpack.server.RatpackServer
+import ratpack.server.ServerConfig
+import java.util.*
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class XnfApiServer(
+ private val xnfSimulator: XnfSimulator,
+ private val ongoingSimulations: OngoingSimulations) {
+
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
+ }
+ }
+
+ private fun configureHandlers(chain: Chain) {
+ chain
+ .post("simulator", ::startSimulationHandler)
+ .post("simulator/async", ::startSimulationHandler)
+ .get("simulator/:id", ::simulatorStatusHandler)
+ .get("healthcheck") { ctx ->
+ logger.info("Checking health")
+ ctx.response.status(HttpConstants.STATUS_OK).send()
+ }
+ }
+
+ private fun startSimulationHandler(ctx: Context) {
+ logger.info("Starting asynchronous scenario")
+ ctx.request.body.then { body ->
+ val id = startSimulation(body)
+ ctx.response.sendEitherErrorOrResponse(id)
+ }
+ }
+
+ private fun startSimulation(body: TypedData): Either<ParsingError, Response> {
+ return xnfSimulator.startSimulation(body.inputStream)
+ .map(ongoingSimulations::startAsynchronousSimulation)
+ .map(Responses::acceptedResponse)
+ }
+
+ private fun simulatorStatusHandler(ctx: Context) {
+ val id = UUID.fromString(ctx.pathTokens["id"])
+ val status = ongoingSimulations.status(id)
+ val response = Responses.statusResponse(status.toString(), status.message)
+ ctx.response.sendAndHandleErrors(IO.just(response))
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
new file mode 100644
index 00000000..0b321362
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import arrow.core.Option
+import arrow.core.fix
+import arrow.instances.option.monad.monad
+import arrow.typeclasses.binding
+import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.intValue
+import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorConfiguration>(DefaultParser()) {
+ override val cmdLineOptionsList = listOf(
+ VES_HV_PORT,
+ VES_HV_HOST,
+ LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
+ SSL_DISABLE,
+ KEY_STORE_FILE,
+ KEY_STORE_PASSWORD,
+ TRUST_STORE_FILE,
+ TRUST_STORE_PASSWORD)
+
+ override fun getConfiguration(cmdLine: CommandLine): Option<SimulatorConfiguration> =
+ Option.monad().binding {
+ val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
+ val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
+ val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
+ WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+
+ SimulatorConfiguration(
+ listenPort,
+ vesHost,
+ vesPort,
+ maxPayloadSizeBytes,
+ createSecurityConfiguration(cmdLine).bind())
+ }.fix()
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
new file mode 100644
index 00000000..3395d282
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+data class SimulatorConfiguration(
+ val listenPort: Int,
+ val vesHost: String,
+ val vesPort: Int,
+ val maxPayloadSizeBytes: Int,
+ val security: SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
new file mode 100644
index 00000000..21748ae8
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/simulations.kt
@@ -0,0 +1,76 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+
+import arrow.effects.IO
+import kotlinx.coroutines.asCoroutineDispatcher
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.*
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since August 2018
+ */
+class OngoingSimulations(executor: Executor = Executors.newCachedThreadPool()) {
+ private val asyncSimulationContext = executor.asCoroutineDispatcher()
+ private val simulations = ConcurrentHashMap<UUID, Status>()
+
+ fun startAsynchronousSimulation(simulationIo: IO<Unit>): UUID {
+ val id = UUID.randomUUID()
+ simulations[id] = StatusOngoing
+
+ simulationIo.continueOn(asyncSimulationContext).unsafeRunAsync { result ->
+ result.fold(
+ { err ->
+ logger.warn("Error", err)
+ simulations[id] = StatusFailure(err)
+ },
+ {
+ logger.info("Finished sending messages")
+ simulations[id] = StatusSuccess
+ }
+ )
+ }
+ return id
+ }
+
+ fun status(id: UUID) = simulations.getOrDefault(id, StatusNotFound)
+
+ internal fun clear() {
+ simulations.clear()
+ }
+
+ companion object {
+ private val logger = Logger(XnfApiServer::class)
+ }
+}
+
+sealed class Status(val message: String) {
+ override fun toString() = this::class.simpleName ?: "null"
+}
+
+object StatusNotFound : Status("not found")
+object StatusOngoing : Status("ongoing")
+object StatusSuccess : Status("success")
+data class StatusFailure(val cause: Throwable) : Status("Error ${cause.message}")
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
new file mode 100644
index 00000000..4512dfbf
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -0,0 +1,61 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
+import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
+import org.onap.dcae.collectors.veshv.utils.arrow.unit
+import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+
+private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
+private val logger = Logger(PACKAGE_NAME)
+const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
+ .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
+ .map { config ->
+ logger.info("Using configuration: $config")
+ val xnfSimulator = XnfSimulator(
+ VesHvClient(config),
+ MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ XnfApiServer(xnfSimulator, OngoingSimulations())
+ .start(config.listenPort)
+ .unit()
+ }
+ .unsafeRunEitherSync(
+ { ex ->
+ logger.error("Failed to start a server", ex)
+ ExitFailure(1)
+ },
+ {
+ logger.info("Started xNF Simulator API server")
+ }
+ )
diff --git a/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml
new file mode 100644
index 00000000..809f62d4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
new file mode 100644
index 00000000..a04da7bf
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt
@@ -0,0 +1,106 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.effects.IO
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess
+import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds
+import java.util.*
+import java.util.concurrent.Executors
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class OngoingSimulationsTest : Spek({
+ val executor = Executors.newSingleThreadExecutor()
+ val cut = OngoingSimulations(executor)
+
+ describe("simulations repository") {
+ given("not existing task task id") {
+ val id = UUID.randomUUID()
+
+ on("status") {
+ val result = cut.status(id)
+
+ it("should have 'not found' status") {
+ assertThat(result).isEqualTo(StatusNotFound)
+ }
+ }
+ }
+
+ given("never ending task") {
+ val task = IO.async<Unit> { }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have ongoing status") {
+ assertThat(cut.status(result)).isEqualTo(StatusOngoing)
+ }
+ }
+ }
+
+ given("failing task") {
+ val cause = RuntimeException("facepalm")
+ val task = IO.raiseError<Unit>(cause)
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have failing status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusFailure(cause))
+ }
+ }
+ }
+ }
+
+ given("successful task") {
+ val task = IO { println("great success!") }
+
+ on("startAsynchronousSimulation") {
+ val result = cut.startAsynchronousSimulation(task)
+
+ it("should have successful status") {
+ waitUntilSucceeds {
+ assertThat(cut.status(result)).isEqualTo(StatusSuccess)
+ }
+ }
+ }
+ }
+
+ afterGroup {
+ executor.shutdown()
+ }
+ }
+
+ afterEachTest { cut.clear() }
+})
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
new file mode 100644
index 00000000..95510e77
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.main
+
+import arrow.core.Left
+import arrow.core.None
+import arrow.core.Right
+import arrow.effects.IO
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
+import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import reactor.core.publisher.Flux
+import java.io.ByteArrayInputStream
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since September 2018
+ */
+internal class XnfSimulatorTest : Spek({
+ lateinit var cut: XnfSimulator
+ lateinit var vesClient: VesHvClient
+ lateinit var messageParametersParser: MessageParametersParser
+ lateinit var messageGenerator: MessageGenerator
+
+ beforeEachTest {
+ vesClient = mock()
+ messageParametersParser = mock()
+ messageGenerator = mock()
+ cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser)
+ }
+
+ describe("startSimulation") {
+ it("should fail when empty input stream") {
+ // given
+ val emptyInputStream = ByteArrayInputStream(byteArrayOf())
+
+ // when
+ val result = cut.startSimulation(emptyInputStream)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when invalid JSON") {
+ // given
+ val invalidJson = "invalid json".byteInputStream()
+
+ // when
+ val result = cut.startSimulation(invalidJson)
+
+ // then
+ assertThat(result).isLeft()
+ }
+
+ it("should fail when JSON syntax is valid but content is invalid") {
+ // given
+ val json = "[1,2,3]".byteInputStream()
+ val cause = ParsingError("epic fail", None)
+ whenever(messageParametersParser.parse(any())).thenReturn(
+ Left(cause))
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).left().isEqualTo(cause)
+ }
+
+ it("should return generated messages") {
+ // given
+ val json = "[true]".byteInputStream()
+ val messageParams = listOf<MessageParameters>()
+ val generatedMessages = Flux.empty<WireFrameMessage>()
+ val sendingIo = IO {}
+ whenever(messageParametersParser.parse(any())).thenReturn(Right(messageParams))
+ whenever(messageGenerator.createMessageFlux(messageParams)).thenReturn(generatedMessages)
+ whenever(vesClient.sendIo(generatedMessages)).thenReturn(sendingIo)
+
+ // when
+ val result = cut.startSimulation(json)
+
+ // then
+ assertThat(result).right().isSameAs(sendingIo)
+ }
+ }
+})