diff options
43 files changed, 804 insertions, 348 deletions
diff --git a/development.sh b/development.sh new file mode 100755 index 00000000..94500079 --- /dev/null +++ b/development.sh @@ -0,0 +1,66 @@ +#!/bin/bash + +# Usage: source ./development.sh and use functions defined here +# https://httpie.org/ is required for API calls + +export MAVEN_OPTS="-T1C" + +function veshv_full_rebuild() { + mvn clean install -Panalysis ${MAVEN_OPTS} +} + +function veshv_rebuild() { + mvn clean install ${MAVEN_OPTS} +} + +function veshv_build() { + mvn install ${MAVEN_OPTS} +} + +function veshv_fast_build() { + mvn install -DskipTests ${MAVEN_OPTS} +} + +function veshv_docker_start() { + docker-compose down + docker-compose rm -f + docker-compose up +} + +function veshv_docker_clean() { + docker volume prune +} + +function veshv_build_and_start() { + veshv_fast_build && veshv_docker_start +} + +function veshv_fresh_restart() { + docker-compose down + docker-compose rm -f + veshv_docker_clean + veshv_fast_build && docker-compose up +} + +function veshv_simul_dcaeapp_count() { + http --json GET http://localhost:8100/messages/count +} + +function veshv_simul_dcaeapp_last_key() { + http --json GET http://localhost:8100/messages/last/key +} + +function veshv_simul_dcaeapp_last_value() { + http --json GET http://localhost:8100/messages/last/value +} + +function veshv_simul_client() { + # feed me with json file using "<" + http --json POST http://localhost:8000/simulator/sync +} + +function veshv_simul_client_async() { + # feed me with json file using "<" + http --json POST http://localhost:8000/simulator/async +} + diff --git a/docker-compose.yml b/docker-compose.yml index 8db767c8..65951edc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,9 +1,11 @@ version: "2" services: + zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" + kafka: image: wurstmeister/kafka ports: @@ -17,29 +19,37 @@ services: - /var/run/docker.sock:/var/run/docker.sock depends_on: - zookeeper - veshvcollector: - build: - context: hv-collector-main - dockerfile: Dockerfile + + ves-hv-collector: + image: onap/ves-hv-collector +# build: +# context: hv-collector-main +# dockerfile: Dockerfile ports: - "6061:6061/tcp" depends_on: - kafka volumes: - ./ssl/:/etc/ves-hv/ - xnfsimulator: - build: - context: hv-collector-client-simulator - dockerfile: Dockerfile + + xnf-simulator: + image: onap/ves-hv-collector-client-simulator +# build: +# context: hv-collector-client-simulator +# dockerfile: Dockerfile + ports: + - "8000:5000/tcp" depends_on: - - veshvcollector + - ves-hv-collector volumes: - ./ssl/:/etc/ves-hv/ + dcae-app-simulator: - build: - context: hv-collector-dcae-app-simulator - dockerfile: Dockerfile + image: onap/ves-hv-collector-dcae-simulator +# build: +# context: hv-collector-dcae-app-simulator +# dockerfile: Dockerfile ports: - - "8080:8080/tcp" + - "8100:5000/tcp" depends_on: - kafka diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile index 58cfa448..7d12c494 100644 --- a/hv-collector-client-simulator/Dockerfile +++ b/hv-collector-client-simulator/Dockerfile @@ -5,9 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0" LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" LABEL maintainer="Nokia Wroclaw ONAP Team" +EXPOSE 5000 + WORKDIR /opt/ves-hv-client-simulator ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"] -CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"] +CMD ["--ves-host", "ves-hv-collector", "--ves-port", "6061"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ COPY target/hv-collector-client-simulator-*.jar ./ diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index c1c1f2e8..8cfe0a4f 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -19,8 +19,8 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <licenses> @@ -94,6 +94,10 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> <groupId>commons-cli</groupId> <artifactId>commons-cli</artifactId> </dependency> diff --git a/hv-collector-client-simulator/sample-request.json b/hv-collector-client-simulator/sample-request.json new file mode 100644 index 00000000..ca8bd885 --- /dev/null +++ b/hv-collector-client-simulator/sample-request.json @@ -0,0 +1,20 @@ +{ + "commonEventHeader": { + "version": "sample-version", + "domain": 10, + "sequence": 1, + "priority": 1, + "eventId": "sample-event-id", + "eventName": "sample-event-name", + "eventType": "sample-event-type", + "startEpochMicrosec": 120034455, + "lastEpochMicrosec": 120034455, + "nfNamingCode": "sample-nf-naming-code", + "nfcNamingCode": "sample-nfc-naming-code", + "reportingEntityId": "sample-reporting-entity-id", + "reportingEntityName": "sample-reporting-entity-name", + "sourceId": "sample-source-id", + "sourceName": "sample-source-name" + }, + "messagesAmount": 25000 +} diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index b8a4b888..f29b693c 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -19,16 +19,16 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.config -import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT /** @@ -40,6 +40,8 @@ internal object DefaultValues { const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" const val CERT_FILE = "/etc/ves-hv/client.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" + const val VES_HV_PORT = 6061 + const val VES_HV_HOST = "veshvcollector" } internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfiguration>(DefaultParser()) { @@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfigu ) override fun getConfiguration(cmdLine: CommandLine): ClientConfiguration { - val host = cmdLine.stringValue(VES_HV_HOST) - val port = cmdLine.intValue(VES_HV_PORT) + val host = cmdLine.stringValue(VES_HV_HOST, DefaultValues.VES_HV_HOST) + val port = cmdLine.intValue(VES_HV_PORT, DefaultValues.VES_HV_PORT) val messagesAmount = cmdLine.longValue(MESSAGES_TO_SEND_AMOUNT, DefaultValues.MESSAGES_AMOUNT) return ClientConfiguration( host, diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt index bc7db869..3f872b51 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt @@ -19,13 +19,17 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters import org.onap.dcae.collectors.veshv.utils.logging.Logger +import ratpack.exec.Promise import ratpack.handling.Chain import ratpack.handling.Context import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Mono +import reactor.core.publisher.Flux +import reactor.core.scheduler.Schedulers import javax.json.Json import javax.json.JsonObject @@ -35,30 +39,46 @@ import javax.json.JsonObject */ class HttpServer(private val vesClient: VesHvClient) { - fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable { - RatpackServer.of { - it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers) + fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO { + RatpackServer.start { server -> + server.serverConfig(ServerConfig.embedded().port(port)) + .handlers(this::configureHandlers) } - }.doOnNext { it.start() } + } private fun configureHandlers(chain: Chain) { - chain.post("simulator") { ctx -> - ctx.request.body - .map { Json.createReader(it.inputStream).readObject() } - .map { extractMessageParameters(it) } - .map { MessageFactory.INSTANCE.createMessageFlux(it) } - .onError { handleException(it, ctx) } - .then { - vesClient.send(it) - ctx.response - .status(STATUS_OK) - .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() - .add("response", "Request accepted") - .build() - .toString()) - } - } + chain + .post("simulator/sync") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendIo(it) } + .map { it.unsafeRunSync() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + .post("simulator/async") { ctx -> + createMessageFlux(ctx) + .map { vesClient.sendRx(it) } + .map { it.subscribeOn(Schedulers.elastic()).subscribe() } + .onError { handleException(it, ctx) } + .then { sendAcceptedResponse(ctx) } + } + } + + private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> { + return ctx.request.body + .map { Json.createReader(it.inputStream).readObject() } + .map { extractMessageParameters(it) } + .map { MessageFactory.INSTANCE.createMessageFlux(it) } + } + + private fun sendAcceptedResponse(ctx: Context) { + ctx.response + .status(STATUS_OK) + .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder() + .add("response", "Request accepted") + .build() + .toString()) } private fun handleException(t: Throwable, ctx: Context) { diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 78a72d93..be351b50 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -19,19 +19,22 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import arrow.effects.IO import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider -import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher +import reactor.core.publisher.EmitterProcessor import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import reactor.core.publisher.ReplayProcessor import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient @@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) { } .build() - fun send(messages: Flux<WireFrame>) = - client - .newHandler { _, out -> handler(out, messages) } - .doOnError{logger.info("Failed to connect to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}")} - .subscribe { logger.info("Connected to VesHvCollector on " + - "${configuration.vesHost}:${configuration.vesPort}") } + fun sendIo(messages: Flux<WireFrame>) = IO<Unit> { + sendRx(messages).block() + } + fun sendRx(messages: Flux<WireFrame>): Mono<Void> { + val complete = ReplayProcessor.create<Void>(1) + client + .newHandler { _, output -> handler(complete, messages, output) } + .doOnError { + logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + .subscribe { + logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") + } + return complete.then() + } - private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> { + private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound): + Publisher<Void> { + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val context = nettyOutbound.context() + context.onClose { + logger.info { "Connection to ${context.address()} has been closed" } + } - val encoder = WireFrameEncoder(nettyOutbound.alloc()) + // TODO: Close channel after all messages have been sent + // The code bellow doesn't work because it closes the channel earlier and not all are consumed... +// complete.subscribe { +// context.channel().disconnect().addListener { +// if (it.isSuccess) +// logger.info { "Connection closed" } +// else +// logger.warn("Failed to close the connection", it.cause()) +// } +// } val frames = messages .map(encoder::encode) - .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) + .window(MAX_BATCH_SIZE) return nettyOutbound - .options { it.flushOnEach() } - .send(frames) - .then { logger.info("Messages have been sent") } + .options { it.flushOnBoundary() } + .sendGroups(frames) + .send(Mono.just(Unpooled.EMPTY_BUFFER)) + .then { + logger.info("Messages have been sent") + complete.onComplete() + } + .then() } private fun createSslContext(config: SecurityConfiguration): SslContext = @@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) { .build() companion object { + private const val MAX_BATCH_SIZE = 128 private val logger = Logger(VesHvClient::class) } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 08bc6a71..f2229507 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -19,35 +19,41 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf +import arrow.core.Failure +import arrow.core.Success +import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import org.slf4j.LoggerFactory.getLogger +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain +import org.onap.dcae.collectors.veshv.utils.logging.Logger -private val logger = getLogger("Simulator :: main") +private val logger = Logger("Simulator :: main") +private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ fun main(args: Array<String>) { - try { - val clientConfig = ArgBasedClientConfiguration().parse(args) - val vesClient = VesHvClient(clientConfig) + val httpServer = ArgBasedClientConfiguration().parse(args) + .map(::VesHvClient) + .map(::HttpServer) - HttpServer(vesClient) - .start() - .block() - - } catch (e: WrongArgumentException) { - e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") - System.exit(1) - } catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves client", e) - System.exit(2) + when (httpServer) { + is Success -> httpServer.value.start().unsafeRunAsync { + it.fold( + { ex -> + logger.error("Failed to start a server", ex) + }, + { srv -> + logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})") + } + ) + } + is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger) } } - diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt index 6420d84d..2746c0a6 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.main.config +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({ cut = ArgBasedClientConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parse(vararg cmdLine: String): ClientConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } describe("parsing arguments") { lateinit var result: ClientConfiguration diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index a372fb22..18657316 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -19,130 +19,135 @@ ~ ============LICENSE_END========================================================= --> <project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> - <parent> - <groupId>org.onap.dcaegen2.collectors.veshv</groupId> - <artifactId>ves-hv-collector</artifactId> - <version>1.0.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> + <parent> + <groupId>org.onap.dcaegen2.collectors.veshv</groupId> + <artifactId>ves-hv-collector</artifactId> + <version>1.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> - <artifactId>hv-collector-core</artifactId> - <description>VES HighVolume Collector :: Core</description> + <artifactId>hv-collector-core</artifactId> + <description>VES HighVolume Collector :: Core</description> - <properties> - <skipAnalysis>false</skipAnalysis> - </properties> + <properties> + <skipAnalysis>false</skipAnalysis> + </properties> - <build> - <plugins> - <plugin> - <artifactId>kotlin-maven-plugin</artifactId> - <groupId>org.jetbrains.kotlin</groupId> - </plugin> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <groupId>org.apache.maven.plugins</groupId> - </plugin> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-core</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.addons</groupId> - <artifactId>reactor-extra</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.ipc</groupId> - <artifactId>reactor-netty</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> - </dependency> - <dependency> - <groupId>io.netty</groupId> - <artifactId>netty-tcnative-boringssl-static</artifactId> - <scope>runtime</scope> - <classifier>${os.detected.classifier}</classifier> - </dependency> - <dependency> - <groupId>javax.json</groupId> - <artifactId>javax.json-api</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <scope>runtime</scope> - </dependency> + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-domain</artifactId> + <version>${project.parent.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-reflect</artifactId> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.ipc</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.kafka</groupId> + <artifactId>reactor-kafka</artifactId> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <scope>runtime</scope> + <classifier>${os.detected.classifier}</classifier> + </dependency> + <dependency> + <groupId>javax.json</groupId> + <artifactId>javax.json-api</artifactId> + </dependency> + <dependency> + <groupId>org.glassfish</groupId> + <artifactId>javax.json</artifactId> + <scope>runtime</scope> + </dependency> - <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>test</scope> - </dependency> - </dependencies> + + <dependency> + <groupId>com.nhaarman</groupId> + <artifactId>mockito-kotlin</artifactId> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + </dependency> + <dependency> + <groupId>org.assertj</groupId> + <artifactId>assertj-core</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <scope>test</scope> + </dependency> + </dependencies> </project> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index ed686fe8..d6158481 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.boundary +import arrow.effects.IO import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -32,9 +33,10 @@ interface Collector { typealias CollectorProvider = () -> Collector interface Server { - fun start(): Mono<Void> + fun start(): IO<ServerHandle> } -interface ServerFactory { - fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server +abstract class ServerHandle(val host: String, val port: Int) { + abstract fun shutdown(): IO<Unit> + abstract fun await(): IO<Unit> } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index f3f0a891..cee658b6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -19,10 +19,12 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.Routing import org.onap.dcae.collectors.veshv.model.VesMessage class Router(private val routing: Routing) { - fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message) + fun findDestination(message: VesMessage): Option<RoutedMessage> = + routing.routeFor(message.header).map { it(message) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 222eaefa..033095ad 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.Option import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector @@ -67,7 +68,10 @@ internal class VesHvCollector( wireChunkDecoder.release() } - private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) + private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> = + mapper(input).fold( + { Mono.empty() }, + { Mono.just(it) }) companion object { val logger = Logger(VesHvCollector::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index a5c41046..5f4bf354 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicLong @@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider { override fun invoke(config: CollectorConfiguration): Sink { return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) private val totalMessages = AtomicLong() private val totalBytes = AtomicLong() @@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider { companion object { const val INFO_LOGGING_FREQ = 100_000 + private val logger = Logger(LoggingSinkProvider::class) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index 0a548a52..f8fa72a6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -20,27 +20,38 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult +import java.util.concurrent.atomic.AtomicLong /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { + private val sentMessages = AtomicLong(0) override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { val records = messages.map(this::vesToKafkaRecord) - return sender.send(records) + val result = sender.send(records) .doOnNext(::logException) .filter(::isSuccessful) .map { it.correlationMetadata() } + + return if (logger.traceEnabled) { + result.doOnNext(::logSentMessage) + } else { + result + } } private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { @@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM } } - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null + private fun logSentMessage(sentMsg: RoutedMessage) { + logger.trace { + val msgNum = sentMessages.incrementAndGet() + "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" + } + } + + private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null companion object { val logger = Logger(KafkaSink::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt index 9753d9e5..4e9932cc 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt @@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class ProtobufSerializer :Serializer<MessageLite> { +class ProtobufSerializer : Serializer<MessageLite> { override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { // no configuration } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 65b3b29e..0426ceb1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,8 +19,10 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import arrow.effects.IO import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.boundary.ServerHandle import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher @@ -28,7 +30,9 @@ import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions +import reactor.ipc.netty.tcp.BlockingNettyContext import reactor.ipc.netty.tcp.TcpServer +import java.time.Duration import java.util.function.BiFunction /** @@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val sslContextFactory: SslContextFactory, private val collectorProvider: CollectorProvider) : Server { - override fun start(): Mono<Void> { - logger.info { "Listening on port ${serverConfig.port}" } - return Mono.defer { - val nettyContext = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u -> - handleConnection(t, u) - }) - Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() } - } + override fun start(): IO<ServerHandle> = IO { + val ctx = TcpServer.builder() + .options(this::configureServer) + .build() + .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ -> + handleConnection(input) + }) + NettyServerHandle(ctx) } private fun configureServer(opts: ServerOptions.Builder<*>) { @@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration)) } - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { - logger.debug("Got connection") - nettyOutbound.alloc() + private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> { + logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + + val dataStream = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() - val sendHello = nettyOutbound - .options { it.flushOnEach() } - .sendString(Mono.just("ONAP_VES_HV/0.1\n")) - .then() + return collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), dataStream) + } - val handleIncomingMessages = collectorProvider() - .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) + private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + onReadIdle(timeout.toMillis()) { + logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." } + context().channel().close().addListener { - return sendHello.then(handleIncomingMessages) + if (it.isSuccess) + logger.debug { "Client disconnected because of idle timeout" } + else + logger.warn("Channel close failed", it.cause()) + } + } + return this + } + + private fun NettyInbound.logConnectionClosed(): NettyInbound { + context().onClose { + logger.info("Connection from ${remoteAddress()} has been closed") + } + return this } + + private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { + override fun shutdown() = IO { + ctx.shutdown() + } + + override fun await() = IO<Unit> { + ctx.context.channel().closeFuture().sync() + } + } + companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt index 34a8b928..b788f511 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -56,7 +56,7 @@ internal class StreamBufferEmitter( else -> { streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) sink.onDispose { - logger.debug("Disposing read components") + logger.trace { "Disposing read components" } streamBuffer.discardReadComponents() } sink.onRequest { requestedFrameCount -> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index a576dc65..abebff3d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -84,7 +84,7 @@ internal class WireFrameSink( try { decoder.decodeFirst(streamBuffer) } catch (ex: MissingWireFrameBytesException) { - logger.debug { "${ex.message} - waiting for more data" } + logger.trace { "${ex.message} - waiting for more data" } null } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 8d01c075..67a7d6f2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration data class ServerConfiguration( val port: Int, val configurationUrl: String, - val securityConfiguration: SecurityConfiguration) + val securityConfiguration: SecurityConfiguration, + val idleTimeout: Duration, + val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index bc030587..e9cd5f3f 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -19,12 +19,14 @@ */ package org.onap.dcae.collectors.veshv.model +import arrow.core.Option import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain data class Routing(val routes: List<Route>) { - fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) } + fun routeFor(commonHeader: CommonEventHeader): Option<Route> = + Option.fromNullable(routes.find { it.applies(commonHeader) }) } data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index c852f5f4..599a9d40 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -19,12 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl +import arrow.core.None +import arrow.core.Some import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -61,15 +64,15 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(2) + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) } it("should be routed to proper topic") { - assertThat(result?.topic).isEqualTo("ves_rtpm") + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) } it("should be routed with a given message") { - assertThat(result?.message).isSameAs(message) + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) } } @@ -82,15 +85,15 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(0) + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) } it("should be routed to proper topic") { - assertThat(result?.topic).isEqualTo("ves_trace") + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) } it("should be routed with a given message") { - assertThat(result?.message).isSameAs(message) + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) } } @@ -99,7 +102,7 @@ object RouterTest : Spek({ val result = cut.findDestination(message) it("should not have route available") { - assertThat(result).isNull() + assertThat(result).isEqualTo(None) } } } diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile index a449078e..68b562d6 100644 --- a/hv-collector-dcae-app-simulator/Dockerfile +++ b/hv-collector-dcae-app-simulator/Dockerfile @@ -5,11 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0" LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" LABEL maintainer="Nokia Wroclaw ONAP Team" -EXPOSE 8080 +EXPOSE 5000 WORKDIR /opt/ves-hv-dcae-app-simulator ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"] -CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "8080"] +CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "5000"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ COPY target/hv-collector-dcae-app-simulator-*.jar ./ diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index 046f5ed0..a2f92e81 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -93,6 +93,10 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + </dependency> + <dependency> <groupId>io.ratpack</groupId> <artifactId>ratpack-core</artifactId> </dependency> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt index 3f539302..27edde9c 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt @@ -24,12 +24,14 @@ import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DefaultValues.API_PORT import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT internal object DefaultValues { const val API_PORT = 8080 + const val KAFKA_SERVERS = "kafka:9092" + const val KAFKA_TOPICS = "ves_hvRanMeas" } class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { @@ -41,8 +43,8 @@ class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfigur override fun getConfiguration(cmdLine: CommandLine): DcaeAppSimConfiguration { val port = cmdLine.intValue(LISTEN_PORT, API_PORT) - val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS) - val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS).split(",").toSet() + val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS, DefaultValues.KAFKA_SERVERS) + val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS, DefaultValues.KAFKA_TOPICS).split(",").toSet() return DcaeAppSimConfiguration( port, kafkaBootstrapServers, diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt index f7703b86..d53609ca 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt @@ -19,10 +19,10 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka +import arrow.effects.IO import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono import reactor.kafka.receiver.KafkaReceiver import reactor.kafka.receiver.ReceiverOptions import java.util.* @@ -33,10 +33,10 @@ import java.util.* */ class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) { - fun start(): Mono<Consumer> = Mono.create { sink -> + fun start(): IO<Consumer> = IO { val consumer = Consumer() receiver.receive().subscribe(consumer::update) - sink.success(consumer) + consumer } companion object { diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt index 5f0fe7f6..66169534 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt @@ -19,7 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka -import reactor.core.publisher.Mono +import arrow.core.Option +import arrow.effects.IO +import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.kafka.receiver.ReceiverRecord /** @@ -27,10 +29,11 @@ import reactor.kafka.receiver.ReceiverRecord * @since June 2018 */ -class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?) +class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>) interface ConsumerStateProvider { - fun currentState(): Mono<ConsumerState> + fun currentState(): ConsumerState + fun reset(): IO<Unit> } class Consumer : ConsumerStateProvider { @@ -38,18 +41,28 @@ class Consumer : ConsumerStateProvider { private var lastKey: ByteArray? = null private var lastValue: ByteArray? = null - override fun currentState(): Mono<ConsumerState> = Mono.create { sink -> - val state = synchronized(this) { - ConsumerState(msgCount, lastKey, lastValue) + override fun currentState() = + ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue)) + + override fun reset() = IO { + synchronized(this) { + msgCount = 0 + lastKey = null + lastValue = null } - sink.success(state) } fun update(record: ReceiverRecord<ByteArray, ByteArray>) { + logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" } + synchronized(this) { msgCount++ lastKey = record.key() lastValue = record.value() } } + + companion object { + private val logger = Logger(Consumer::class) + } } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index c037af35..f7d44ede 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -19,32 +19,30 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp +import arrow.core.Failure +import arrow.core.Success import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.slf4j.LoggerFactory private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main")) fun main(args: Array<String>) { + logger.info("Starting DCAE APP simulator") - try { - logger.info("Starting DCAE APP simulator") - val simulatorConfig = ArgBasedDcaeAppSimConfiguration().parse(args) + val config = ArgBasedDcaeAppSimConfiguration().parse(args) + when (config) { + is Success -> startApp(config.value).unsafeRunSync() + is Failure -> config.handleErrorsInMain("", logger) + } +} - KafkaSource.create(simulatorConfig.kafkaBootstrapServers, simulatorConfig.kafkaTopics) +private fun startApp(config: DcaeAppSimConfiguration) = + KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics) .start() .map(::ApiServer) - .flatMap { it.start(simulatorConfig.apiPort) } - .block() - } catch (e: WrongArgumentException) { - e.printHelp("java org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt") - System.exit(1) - } catch (e: Exception) { - logger.error(e.localizedMessage) - logger.debug("An error occurred when starting ves dcea app simulator", e) - System.exit(2) - } -} + .flatMap { it.start(config.apiPort) } diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt index fcb8e131..2fa8abec 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt @@ -19,13 +19,17 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote +import arrow.core.Try +import arrow.core.getOrElse +import arrow.effects.IO +import com.google.protobuf.MessageOrBuilder import com.google.protobuf.util.JsonFormat import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider +import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields import org.onap.ves.VesEventV5.VesEvent import ratpack.handling.Chain import ratpack.server.RatpackServer import ratpack.server.ServerConfig -import reactor.core.publisher.Mono /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -34,41 +38,71 @@ import reactor.core.publisher.Mono class ApiServer(private val consumerState: ConsumerStateProvider) { private val jsonPrinter = JsonFormat.printer() - fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable { - RatpackServer.of { server -> + fun start(port: Int): IO<RatpackServer> = IO { + RatpackServer.start { server -> server.serverConfig(ServerConfig.embedded().port(port)) .handlers(this::setupHandlers) } - }.doOnNext(RatpackServer::start) + } private fun setupHandlers(chain: Chain) { chain .get("messages/count") { ctx -> ctx.response.contentType(CONTENT_TEXT) - consumerState.currentState() - .map { it.msgCount.toString() } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + ctx.response.send(state.msgCount.toString()) } .get("messages/last/key") { ctx -> ctx.response.contentType(CONTENT_JSON) - consumerState.currentState() - .map { it.lastKey } - .map { VesEvent.CommonEventHeader.parseFrom(it) } - .map { jsonPrinter.print(it) } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + val resp = state.lastKey + .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) } .get("messages/last/value") { ctx -> ctx.response.contentType(CONTENT_JSON) - consumerState.currentState() - .map { it.lastValue } - .map { VesEvent.parseFrom(it) } - .map { jsonPrinter.print(it) } - .subscribe(ctx.response::send) + val state = consumerState.currentState() + val resp = state.lastValue + .map { Try { VesEvent.parseFrom(it) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) + } + + .get("messages/last/hvRanMeasFields") { ctx -> + ctx.response.contentType(CONTENT_JSON) + val state = consumerState.currentState() + val resp = state.lastValue + .flatMap { Try { VesEvent.parseFrom(it) }.toOption() } + .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS } + .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } } + .map(this::protobufToJson) + .getOrElse { "null" } + ctx.response.send(resp) + } + + .delete("messages") { ctx -> + ctx.response.contentType(CONTENT_TEXT) + consumerState.reset() + .unsafeRunAsync { + it.fold( + { ctx.response.send("NOK") }, + { ctx.response.send("OK") } + ) + } } } + private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String = + parseResult.fold( + { ex -> "\"Failed to parse protobuf: ${ex.message}\"" }, + { jsonPrinter.print(it) }) + + companion object { private const val CONTENT_TEXT = "text/plain" private const val CONTENT_JSON = "application/json" diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt index 817df8e0..d99de17b 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt @@ -19,13 +19,14 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import kotlin.test.assertFailsWith internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ @@ -38,7 +39,21 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ cut = ArgBasedDcaeAppSimConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parseExpectingSuccess(vararg cmdLine: String): DcaeAppSimConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } + + fun parseExpectingFailure(vararg cmdLine: String): Throwable { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> throw AssertionError("parsing should have failed") + is Failure -> result.exception + } + } describe("parsing arguments") { lateinit var result: DcaeAppSimConfiguration @@ -46,7 +61,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("all parameters are present in the long form") { beforeEachTest { - result = parse("--listen-port", "6969", + result = parseExpectingSuccess("--listen-port", "6969", "--kafka-bootstrap-servers", kafkaBootstrapServers, "--kafka-topics", kafkaTopics ) @@ -71,7 +86,9 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("some parameters are present in the short form") { beforeEachTest { - result = parse("-p", "666", "--kafka-bootstrap-servers", kafkaBootstrapServers, "-f", kafkaTopics) + result = parseExpectingSuccess("-p", "666", + "--kafka-bootstrap-servers", kafkaBootstrapServers, + "-f", kafkaTopics) } it("should set proper port") { @@ -92,7 +109,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ given("all optional parameters are absent") { beforeEachTest { - result = parse("-s", kafkaBootstrapServers, "-f", kafkaTopics) + result = parseExpectingSuccess("-s", kafkaBootstrapServers, "-f", kafkaTopics) } it("should set default port") { @@ -100,21 +117,20 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({ } } - describe("required parameter is absent") { given("kafka topics are missing") { it("should throw exception") { - assertFailsWith<WrongArgumentException> { parse("-s", kafkaBootstrapServers) } + assertThat(parseExpectingFailure("-s", kafkaBootstrapServers)) + .isInstanceOf(WrongArgumentException::class.java) } } given("kafka bootstrap servers are missing") { it("should throw exception") { - assertFailsWith<WrongArgumentException> { parse("-f", kafkaTopics) } + assertThat(parseExpectingFailure("-f", kafkaTopics)) + .isInstanceOf(WrongArgumentException::class.java) } } } } - - })
\ No newline at end of file diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml index de9004a8..c11510ac 100644 --- a/hv-collector-domain/pom.xml +++ b/hv-collector-domain/pom.xml @@ -111,7 +111,6 @@ <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test</artifactId> - <version>${kotlin.version}</version> <scope>test</scope> </dependency> <dependency> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt index 59b91d7f..f3e97be2 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt @@ -19,16 +19,19 @@ */ package org.onap.dcae.collectors.veshv.main -import org.apache.commons.cli.DefaultParser import org.apache.commons.cli.CommandLine +import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.time.Duration internal object DefaultValues { const val PORT = 6061 @@ -36,6 +39,7 @@ internal object DefaultValues { const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key" const val CERT_FILE = "/etc/ves-hv/server.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" + const val IDLE_TIMEOUT_SEC = 60L } internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { @@ -44,14 +48,23 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu CONSUL_CONFIG_URL, PRIVATE_KEY_FILE, CERT_FILE, - TRUST_CERT_FILE + TRUST_CERT_FILE, + IDLE_TIMEOUT_SEC, + DUMMY_MODE ) override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration { val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT) val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL) + val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) + val dummyMode = cmdLine.hasOption(DUMMY_MODE) val security = createSecurityConfiguration(cmdLine) - return ServerConfiguration(port, configUrl, security) + return ServerConfiguration( + port = port, + configurationUrl = configUrl, + securityConfiguration = security, + idleTimeout = Duration.ofSeconds(idleTimeoutSec), + dummyMode = dummyMode) } private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration { diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 1f2686ba..074a75e4 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,37 +19,49 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.flatMap import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain -import org.slf4j.LoggerFactory -import kotlin.system.exitProcess -private val logger = LoggerFactory.getLogger("main") +private val logger = Logger("org.onap.dcae.collectors.veshv.main") +private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt" fun main(args: Array<String>) { - try { - val serverConfiguration = ArgBasedServerConfiguration().parse(args) - - val collectorProvider = CollectorFactory( - resolveConfigurationProvider(serverConfiguration), - AdapterFactory.kafkaSink(), - MicrometerMetrics() - ).createVesHvCollectorProvider() - ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() - } catch (ex: WrongArgumentException) { - ex.printMessage() - ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") - exitProcess(1) - } + ArgBasedServerConfiguration().parse(args) + .toEither() + .map(::createServer) + .map(Server::start) + .flatMap { it.attempt().unsafeRunSync() } + .fold( + { ex -> + handleErrorsInMain(ex, PROGRAM_NAME, logger) + }, + { handle -> + logger.info("Server started. Listening on ${handle.host}:${handle.port}") + handle.await().unsafeRunSync() + } + ) } +private fun createServer(config: ServerConfiguration): Server { + val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink() + val collectorProvider = CollectorFactory( + resolveConfigurationProvider(config), + sink, + MicrometerMetrics() + ).createVesHvCollectorProvider() + + return ServerFactory.createNettyTcpServer(config, collectorProvider) +} private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider { diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml index 48da3b18..5127e7ef 100644 --- a/hv-collector-main/src/main/resources/logback.xml +++ b/hv-collector-main/src/main/resources/logback.xml @@ -26,7 +26,10 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/> <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> <root level="INFO"> diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt index 923f9d58..4c2425bc 100644 --- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.Failure +import arrow.core.Success import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -44,7 +46,13 @@ object ArgBasedServerConfigurationTest : Spek({ cut = ArgBasedServerConfiguration() } - fun parse(vararg cmdLine: String) = cut.parse(cmdLine) + fun parse(vararg cmdLine: String): ServerConfiguration { + val result = cut.parse(cmdLine) + return when (result) { + is Success -> result.value + is Failure -> throw AssertionError("Parsing result should be present") + } + } describe("parsing arguments") { given("all parameters are present in the long form") { diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index 8a8a1d8c..3c48280c 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -68,6 +68,10 @@ <artifactId>kotlin-reflect</artifactId> </dependency> <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-instances-data</artifactId> + </dependency> + <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt index 968c340f..34c0e651 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt @@ -19,44 +19,54 @@ */ package org.onap.dcae.collectors.veshv.utils.commandline +import arrow.core.Option +import arrow.core.Try +import arrow.core.getOrElse +import arrow.core.recoverWith import org.apache.commons.cli.CommandLine import org.apache.commons.cli.CommandLineParser import org.apache.commons.cli.Options import java.io.File +import java.nio.file.Path import java.nio.file.Paths abstract class ArgBasedConfiguration<T>(val parser: CommandLineParser) { abstract val cmdLineOptionsList: List<CommandLineOption> - fun parse(args: Array<out String>): T { + fun parse(args: Array<out String>): Try<T> { val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption) - try { - val cmdLine = parser.parse(commandLineOptions, args) - return getConfiguration(cmdLine) - } catch (ex: Exception) { - throw WrongArgumentException(ex, commandLineOptions) - } + return Try { + parser.parse(commandLineOptions, args) + }.recoverWith { ex -> + Try.raise<CommandLine>(WrongArgumentException(ex, commandLineOptions)) + }.map (this::getConfiguration) } protected abstract fun getConfiguration(cmdLine: CommandLine): T - protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Int = - getOptionValue(cmdLineOpt.option.opt).toInt() - protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int = - getOptionValue(cmdLineOpt.option.opt)?.toInt() ?: default - - protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Long = - getOptionValue(cmdLineOpt.option.opt).toLong() + intValue(cmdLineOpt).getOrElse { default } protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long = - getOptionValue(cmdLineOpt.option.opt)?.toLong() ?: default + longValue(cmdLineOpt).getOrElse { default } - protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): String = - getOptionValue(cmdLineOpt.option.opt) + protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> = + optionValue(cmdLineOpt) protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String = - getOptionValue(cmdLineOpt.option.opt) ?: default + optionValue(cmdLineOpt).getOrElse { default } + + protected fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean = + this.hasOption(cmdLineOpt.option.opt) + + protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI()) + + private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption): Option<String> = + Option.fromNullable(getOptionValue(cmdLineOpt.option.opt)) + + private fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> = + optionValue(cmdLineOpt).map(String::toInt) - protected fun stringPathToPath(path: String) = Paths.get(File(path).toURI()) + private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> = + optionValue(cmdLineOpt).map(String::toLong) } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt index 9d1f7aa8..942ca31f 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt @@ -87,4 +87,16 @@ enum class CommandLineOption(val option: Option) { .desc("File with trusted certificate bundle for trusting connections") .build() ), + IDLE_TIMEOUT_SEC(Option.builder("i") + .longOpt("idle-timeout-sec") + .hasArg() + .desc("""Idle timeout for remote hosts. After given time without any data exchange the + |connection might be closed.""".trimMargin()) + .build() + ), + DUMMY_MODE(Option.builder("d") + .longOpt("dummy") + .desc("If present will start in dummy mode (dummy external services)") + .build() + ), } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt index 5f6a86ad..083d5798 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt @@ -23,7 +23,14 @@ import org.apache.commons.cli.HelpFormatter import org.apache.commons.cli.Options -class WrongArgumentException(parent: Exception, private val options: Options) : Exception(parent.message, parent) { +class WrongArgumentException( + message: String, + private val options: Options, + parent: Throwable? = null +) : Exception(message, parent) { + + constructor(par: Throwable, options: Options) : this(par.message ?: "", options, par) + fun printMessage() { println(message) } diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt new file mode 100644 index 00000000..23bf1658 --- /dev/null +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt @@ -0,0 +1,50 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.utils.commandline + +import arrow.core.Failure +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import kotlin.system.exitProcess + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ + +fun handleErrorsInMain(ex: Throwable, programName: String, logger: Logger) { + when (ex) { + is WrongArgumentException -> { + ex.printMessage() + ex.printHelp(programName) + exitProcess(1) + } + + else -> { + logger.error(ex.localizedMessage) + logger.debug("An error occurred when starting VES HV Collector", ex) + System.exit(2) + } + } +} + + +fun <A> Failure<A>.handleErrorsInMain(programName: String, logger: Logger) { + handleErrorsInMain(this.exception, programName, logger) +} diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index f614d426..536fe93c 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -24,11 +24,15 @@ import kotlin.reflect.KClass class Logger(val logger: org.slf4j.Logger) { constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java)) + constructor(name: String) : this(LoggerFactory.getLogger(name)) // // TRACE // + val traceEnabled: Boolean + get() = logger.isTraceEnabled + fun trace(messageProvider: () -> String) { if (logger.isTraceEnabled) { logger.trace(messageProvider()) @@ -50,7 +50,8 @@ </modules> <properties> - <kotlin.version>1.2.41</kotlin.version> + <kotlin.version>1.2.50</kotlin.version> + <arrow.version>0.7.2</arrow.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version> @@ -531,12 +532,37 @@ <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-core</artifactId> - <version>0.7.2</version> + <version>${arrow.version}</version> + <exclusions> + <exclusion> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib</artifactId> + </exclusion> + <exclusion> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-stdlib-jdk7</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>io.arrow-kt</groupId> <artifactId>arrow-syntax</artifactId> - <version>0.7.2</version> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-instances-core</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-instances-data</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-effects</artifactId> + <version>${arrow.version}</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> |