aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-14 09:48:46 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 09:49:02 +0200
commit67689405071acdad2b26d5112b3662605e474ce9 (patch)
tree3e945129934d5721922fdabf229b0d61b772dfdb
parente7987b7a660060746d5f49e1ec90b1ff90fcf55a (diff)
Various improvements
* Kotlin upgrade * Monad usage on APIs * Idle timeout * Simulator enhancements Closes ONAP-390 Change-Id: I3c00fcfe38c722caf661ddaad428cf089eeefcaa Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
-rwxr-xr-xdevelopment.sh66
-rw-r--r--docker-compose.yml36
-rw-r--r--hv-collector-client-simulator/Dockerfile4
-rw-r--r--hv-collector-client-simulator/pom.xml8
-rw-r--r--hv-collector-client-simulator/sample-request.json20
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt16
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt62
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt62
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt42
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt10
-rw-r--r--hv-collector-core/pom.xml239
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt73
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt4
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt17
-rw-r--r--hv-collector-dcae-app-simulator/Dockerfile4
-rw-r--r--hv-collector-dcae-app-simulator/pom.xml4
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt8
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt27
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt30
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt68
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt36
-rw-r--r--hv-collector-domain/pom.xml1
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt27
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt54
-rw-r--r--hv-collector-main/src/main/resources/logback.xml5
-rw-r--r--hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt10
-rw-r--r--hv-collector-utils/pom.xml4
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt48
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt12
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt9
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt50
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt4
-rw-r--r--pom.xml32
43 files changed, 804 insertions, 348 deletions
diff --git a/development.sh b/development.sh
new file mode 100755
index 00000000..94500079
--- /dev/null
+++ b/development.sh
@@ -0,0 +1,66 @@
+#!/bin/bash
+
+# Usage: source ./development.sh and use functions defined here
+# https://httpie.org/ is required for API calls
+
+export MAVEN_OPTS="-T1C"
+
+function veshv_full_rebuild() {
+ mvn clean install -Panalysis ${MAVEN_OPTS}
+}
+
+function veshv_rebuild() {
+ mvn clean install ${MAVEN_OPTS}
+}
+
+function veshv_build() {
+ mvn install ${MAVEN_OPTS}
+}
+
+function veshv_fast_build() {
+ mvn install -DskipTests ${MAVEN_OPTS}
+}
+
+function veshv_docker_start() {
+ docker-compose down
+ docker-compose rm -f
+ docker-compose up
+}
+
+function veshv_docker_clean() {
+ docker volume prune
+}
+
+function veshv_build_and_start() {
+ veshv_fast_build && veshv_docker_start
+}
+
+function veshv_fresh_restart() {
+ docker-compose down
+ docker-compose rm -f
+ veshv_docker_clean
+ veshv_fast_build && docker-compose up
+}
+
+function veshv_simul_dcaeapp_count() {
+ http --json GET http://localhost:8100/messages/count
+}
+
+function veshv_simul_dcaeapp_last_key() {
+ http --json GET http://localhost:8100/messages/last/key
+}
+
+function veshv_simul_dcaeapp_last_value() {
+ http --json GET http://localhost:8100/messages/last/value
+}
+
+function veshv_simul_client() {
+ # feed me with json file using "<"
+ http --json POST http://localhost:8000/simulator/sync
+}
+
+function veshv_simul_client_async() {
+ # feed me with json file using "<"
+ http --json POST http://localhost:8000/simulator/async
+}
+
diff --git a/docker-compose.yml b/docker-compose.yml
index 8db767c8..65951edc 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,9 +1,11 @@
version: "2"
services:
+
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
+
kafka:
image: wurstmeister/kafka
ports:
@@ -17,29 +19,37 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
- veshvcollector:
- build:
- context: hv-collector-main
- dockerfile: Dockerfile
+
+ ves-hv-collector:
+ image: onap/ves-hv-collector
+# build:
+# context: hv-collector-main
+# dockerfile: Dockerfile
ports:
- "6061:6061/tcp"
depends_on:
- kafka
volumes:
- ./ssl/:/etc/ves-hv/
- xnfsimulator:
- build:
- context: hv-collector-client-simulator
- dockerfile: Dockerfile
+
+ xnf-simulator:
+ image: onap/ves-hv-collector-client-simulator
+# build:
+# context: hv-collector-client-simulator
+# dockerfile: Dockerfile
+ ports:
+ - "8000:5000/tcp"
depends_on:
- - veshvcollector
+ - ves-hv-collector
volumes:
- ./ssl/:/etc/ves-hv/
+
dcae-app-simulator:
- build:
- context: hv-collector-dcae-app-simulator
- dockerfile: Dockerfile
+ image: onap/ves-hv-collector-dcae-simulator
+# build:
+# context: hv-collector-dcae-app-simulator
+# dockerfile: Dockerfile
ports:
- - "8080:8080/tcp"
+ - "8100:5000/tcp"
depends_on:
- kafka
diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile
index 58cfa448..7d12c494 100644
--- a/hv-collector-client-simulator/Dockerfile
+++ b/hv-collector-client-simulator/Dockerfile
@@ -5,9 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0"
LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
LABEL maintainer="Nokia Wroclaw ONAP Team"
+EXPOSE 5000
+
WORKDIR /opt/ves-hv-client-simulator
ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
-CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"]
+CMD ["--ves-host", "ves-hv-collector", "--ves-port", "6061"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
COPY target/hv-collector-client-simulator-*.jar ./
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml
index c1c1f2e8..8cfe0a4f 100644
--- a/hv-collector-client-simulator/pom.xml
+++ b/hv-collector-client-simulator/pom.xml
@@ -19,8 +19,8 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<licenses>
@@ -94,6 +94,10 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
</dependency>
diff --git a/hv-collector-client-simulator/sample-request.json b/hv-collector-client-simulator/sample-request.json
new file mode 100644
index 00000000..ca8bd885
--- /dev/null
+++ b/hv-collector-client-simulator/sample-request.json
@@ -0,0 +1,20 @@
+{
+ "commonEventHeader": {
+ "version": "sample-version",
+ "domain": 10,
+ "sequence": 1,
+ "priority": 1,
+ "eventId": "sample-event-id",
+ "eventName": "sample-event-name",
+ "eventType": "sample-event-type",
+ "startEpochMicrosec": 120034455,
+ "lastEpochMicrosec": 120034455,
+ "nfNamingCode": "sample-nf-naming-code",
+ "nfcNamingCode": "sample-nfc-naming-code",
+ "reportingEntityId": "sample-reporting-entity-id",
+ "reportingEntityName": "sample-reporting-entity-name",
+ "sourceId": "sample-source-id",
+ "sourceName": "sample-source-name"
+ },
+ "messagesAmount": 25000
+}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
index b8a4b888..f29b693c 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
@@ -19,16 +19,16 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.config
-import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MESSAGES_TO_SEND_AMOUNT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
/**
@@ -40,6 +40,8 @@ internal object DefaultValues {
const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
const val CERT_FILE = "/etc/ves-hv/client.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
+ const val VES_HV_PORT = 6061
+ const val VES_HV_HOST = "veshvcollector"
}
internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfiguration>(DefaultParser()) {
@@ -53,8 +55,8 @@ internal class ArgBasedClientConfiguration : ArgBasedConfiguration<ClientConfigu
)
override fun getConfiguration(cmdLine: CommandLine): ClientConfiguration {
- val host = cmdLine.stringValue(VES_HV_HOST)
- val port = cmdLine.intValue(VES_HV_PORT)
+ val host = cmdLine.stringValue(VES_HV_HOST, DefaultValues.VES_HV_HOST)
+ val port = cmdLine.intValue(VES_HV_PORT, DefaultValues.VES_HV_PORT)
val messagesAmount = cmdLine.longValue(MESSAGES_TO_SEND_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
return ClientConfiguration(
host,
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
index bc7db869..3f872b51 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/HttpServer.kt
@@ -19,13 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.simulators.xnf.config.MessageParameters
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import ratpack.exec.Promise
import ratpack.handling.Chain
import ratpack.handling.Context
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
+import reactor.core.publisher.Flux
+import reactor.core.scheduler.Schedulers
import javax.json.Json
import javax.json.JsonObject
@@ -35,30 +39,46 @@ import javax.json.JsonObject
*/
class HttpServer(private val vesClient: VesHvClient) {
- fun start(port: Int = DEFAULT_PORT): Mono<RatpackServer> = Mono.fromCallable {
- RatpackServer.of {
- it.serverConfig(ServerConfig.embedded().port(port)).handlers(this::configureHandlers)
+ fun start(port: Int = DEFAULT_PORT): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
+ server.serverConfig(ServerConfig.embedded().port(port))
+ .handlers(this::configureHandlers)
}
- }.doOnNext { it.start() }
+ }
private fun configureHandlers(chain: Chain) {
- chain.post("simulator") { ctx ->
- ctx.request.body
- .map { Json.createReader(it.inputStream).readObject() }
- .map { extractMessageParameters(it) }
- .map { MessageFactory.INSTANCE.createMessageFlux(it) }
- .onError { handleException(it, ctx) }
- .then {
- vesClient.send(it)
- ctx.response
- .status(STATUS_OK)
- .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
- .add("response", "Request accepted")
- .build()
- .toString())
- }
- }
+ chain
+ .post("simulator/sync") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendIo(it) }
+ .map { it.unsafeRunSync() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ .post("simulator/async") { ctx ->
+ createMessageFlux(ctx)
+ .map { vesClient.sendRx(it) }
+ .map { it.subscribeOn(Schedulers.elastic()).subscribe() }
+ .onError { handleException(it, ctx) }
+ .then { sendAcceptedResponse(ctx) }
+ }
+ }
+
+ private fun createMessageFlux(ctx: Context): Promise<Flux<WireFrame>> {
+ return ctx.request.body
+ .map { Json.createReader(it.inputStream).readObject() }
+ .map { extractMessageParameters(it) }
+ .map { MessageFactory.INSTANCE.createMessageFlux(it) }
+ }
+
+ private fun sendAcceptedResponse(ctx: Context) {
+ ctx.response
+ .status(STATUS_OK)
+ .send(CONTENT_TYPE_APPLICATION_JSON, Json.createObjectBuilder()
+ .add("response", "Request accepted")
+ .build()
+ .toString())
}
private fun handleException(t: Throwable, ctx: Context) {
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 78a72d93..be351b50 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -19,19 +19,22 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import arrow.effects.IO
import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
-import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
+import reactor.core.publisher.EmitterProcessor
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import reactor.core.publisher.ReplayProcessor
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
@@ -50,28 +53,58 @@ class VesHvClient(private val configuration: ClientConfiguration) {
}
.build()
- fun send(messages: Flux<WireFrame>) =
- client
- .newHandler { _, out -> handler(out, messages) }
- .doOnError{logger.info("Failed to connect to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}")}
- .subscribe { logger.info("Connected to VesHvCollector on " +
- "${configuration.vesHost}:${configuration.vesPort}") }
+ fun sendIo(messages: Flux<WireFrame>) = IO<Unit> {
+ sendRx(messages).block()
+ }
+ fun sendRx(messages: Flux<WireFrame>): Mono<Void> {
+ val complete = ReplayProcessor.create<Void>(1)
+ client
+ .newHandler { _, output -> handler(complete, messages, output) }
+ .doOnError {
+ logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ .subscribe {
+ logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")
+ }
+ return complete.then()
+ }
- private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
+ private fun handler(complete: ReplayProcessor<Void>, messages: Flux<WireFrame>, nettyOutbound: NettyOutbound):
+ Publisher<Void> {
+ val encoder = WireFrameEncoder(nettyOutbound.alloc())
+ val context = nettyOutbound.context()
+ context.onClose {
+ logger.info { "Connection to ${context.address()} has been closed" }
+ }
- val encoder = WireFrameEncoder(nettyOutbound.alloc())
+ // TODO: Close channel after all messages have been sent
+ // The code bellow doesn't work because it closes the channel earlier and not all are consumed...
+// complete.subscribe {
+// context.channel().disconnect().addListener {
+// if (it.isSuccess)
+// logger.info { "Connection closed" }
+// else
+// logger.warn("Failed to close the connection", it.cause())
+// }
+// }
val frames = messages
.map(encoder::encode)
- .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
+ .window(MAX_BATCH_SIZE)
return nettyOutbound
- .options { it.flushOnEach() }
- .send(frames)
- .then { logger.info("Messages have been sent") }
+ .options { it.flushOnBoundary() }
+ .sendGroups(frames)
+ .send(Mono.just(Unpooled.EMPTY_BUFFER))
+ .then {
+ logger.info("Messages have been sent")
+ complete.onComplete()
+ }
+ .then()
}
private fun createSslContext(config: SecurityConfiguration): SslContext =
@@ -83,6 +116,7 @@ class VesHvClient(private val configuration: ClientConfiguration) {
.build()
companion object {
+ private const val MAX_BATCH_SIZE = 128
private val logger = Logger(VesHvClient::class)
}
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 08bc6a71..f2229507 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -19,35 +19,41 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf
+import arrow.core.Failure
+import arrow.core.Success
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ArgBasedClientConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.HttpServer
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.VesHvClient
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import org.slf4j.LoggerFactory.getLogger
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
-private val logger = getLogger("Simulator :: main")
+private val logger = Logger("Simulator :: main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
fun main(args: Array<String>) {
- try {
- val clientConfig = ArgBasedClientConfiguration().parse(args)
- val vesClient = VesHvClient(clientConfig)
+ val httpServer = ArgBasedClientConfiguration().parse(args)
+ .map(::VesHvClient)
+ .map(::HttpServer)
- HttpServer(vesClient)
- .start()
- .block()
-
- } catch (e: WrongArgumentException) {
- e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
- System.exit(1)
- } catch (e: Exception) {
- logger.error(e.localizedMessage)
- logger.debug("An error occurred when starting ves client", e)
- System.exit(2)
+ when (httpServer) {
+ is Success -> httpServer.value.start().unsafeRunAsync {
+ it.fold(
+ { ex ->
+ logger.error("Failed to start a server", ex)
+ },
+ { srv ->
+ logger.info("Started Simulator API server (listening on ${srv.bindHost}:${srv.bindPort})")
+ }
+ )
+ }
+ is Failure -> httpServer.handleErrorsInMain(PROGRAM_NAME, logger)
}
}
-
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
index 6420d84d..2746c0a6 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/config/ArgBasedClientConfigurationTest.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.main.config
+import arrow.core.Failure
+import arrow.core.Success
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -44,7 +46,13 @@ object ArgBasedClientConfigurationTest : Spek({
cut = ArgBasedClientConfiguration()
}
- fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+ fun parse(vararg cmdLine: String): ClientConfiguration {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> result.value
+ is Failure -> throw AssertionError("Parsing result should be present")
+ }
+ }
describe("parsing arguments") {
lateinit var result: ClientConfiguration
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index a372fb22..18657316 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -19,130 +19,135 @@
~ ============LICENSE_END=========================================================
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
- <licenses>
- <license>
- <name>The Apache Software License, Version 2.0</name>
- <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
- </license>
- </licenses>
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
- <parent>
- <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
- <artifactId>ves-hv-collector</artifactId>
- <version>1.0.0-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+ <artifactId>ves-hv-collector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
- <artifactId>hv-collector-core</artifactId>
- <description>VES HighVolume Collector :: Core</description>
+ <artifactId>hv-collector-core</artifactId>
+ <description>VES HighVolume Collector :: Core</description>
- <properties>
- <skipAnalysis>false</skipAnalysis>
- </properties>
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
- <build>
- <plugins>
- <plugin>
- <artifactId>kotlin-maven-plugin</artifactId>
- <groupId>org.jetbrains.kotlin</groupId>
- </plugin>
- <plugin>
- <artifactId>maven-surefire-plugin</artifactId>
- <groupId>org.apache.maven.plugins</groupId>
- </plugin>
- <plugin>
- <groupId>org.jacoco</groupId>
- <artifactId>jacoco-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
- <dependencies>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-utils</artifactId>
- <version>${project.parent.version}</version>
- </dependency>
- <dependency>
- <groupId>${project.parent.groupId}</groupId>
- <artifactId>hv-collector-domain</artifactId>
- <version>${project.parent.version}</version>
- <scope>compile</scope>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-reflect</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-core</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.addons</groupId>
- <artifactId>reactor-extra</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.ipc</groupId>
- <artifactId>reactor-netty</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor.kafka</groupId>
- <artifactId>reactor-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-tcnative-boringssl-static</artifactId>
- <scope>runtime</scope>
- <classifier>${os.detected.classifier}</classifier>
- </dependency>
- <dependency>
- <groupId>javax.json</groupId>
- <artifactId>javax.json-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.glassfish</groupId>
- <artifactId>javax.json</artifactId>
- <scope>runtime</scope>
- </dependency>
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <scope>runtime</scope>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>runtime</scope>
+ </dependency>
- <dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
- <dependency>
- <groupId>ch.qos.logback</groupId>
- <artifactId>logback-classic</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
+
+ <dependency>
+ <groupId>com.nhaarman</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
</project>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index ed686fe8..d6158481 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -32,9 +33,10 @@ interface Collector {
typealias CollectorProvider = () -> Collector
interface Server {
- fun start(): Mono<Void>
+ fun start(): IO<ServerHandle>
}
-interface ServerFactory {
- fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+abstract class ServerHandle(val host: String, val port: Int) {
+ abstract fun shutdown(): IO<Unit>
+ abstract fun await(): IO<Unit>
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index f3f0a891..cee658b6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -19,10 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.Routing
import org.onap.dcae.collectors.veshv.model.VesMessage
class Router(private val routing: Routing) {
- fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+ fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ routing.routeFor(message.header).map { it(message) }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 222eaefa..033095ad 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.Option
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
@@ -67,7 +68,10 @@ internal class VesHvCollector(
wireChunkDecoder.release()
}
- private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+ private fun <T, V> omitWhenNull(input: T, mapper: (T) -> Option<V>): Mono<V> =
+ mapper(input).fold(
+ { Mono.empty() },
+ { Mono.just(it) })
companion object {
val logger = Logger(VesHvCollector::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index a5c41046..5f4bf354 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -23,7 +23,6 @@ import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicLong
@@ -36,7 +35,6 @@ internal class LoggingSinkProvider : SinkProvider {
override fun invoke(config: CollectorConfiguration): Sink {
return object : Sink {
- private val logger = Logger(LoggingSinkProvider::class)
private val totalMessages = AtomicLong()
private val totalBytes = AtomicLong()
@@ -59,5 +57,6 @@ internal class LoggingSinkProvider : SinkProvider {
companion object {
const val INFO_LOGGING_FREQ = 100_000
+ private val logger = Logger(LoggingSinkProvider::class)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index 0a548a52..f8fa72a6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -20,27 +20,38 @@
package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+ private val sentMessages = AtomicLong(0)
override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
val records = messages.map(this::vesToKafkaRecord)
- return sender.send(records)
+ val result = sender.send(records)
.doOnNext(::logException)
.filter(::isSuccessful)
.map { it.correlationMetadata() }
+
+ return if (logger.traceEnabled) {
+ result.doOnNext(::logSentMessage)
+ } else {
+ result
+ }
}
private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
@@ -59,7 +70,14 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesM
}
}
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+ private fun logSentMessage(sentMsg: RoutedMessage) {
+ logger.trace {
+ val msgNum = sentMessages.incrementAndGet()
+ "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
companion object {
val logger = Logger(KafkaSink::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
index 9753d9e5..4e9932cc 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -26,7 +26,7 @@ import org.apache.kafka.common.serialization.Serializer
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class ProtobufSerializer :Serializer<MessageLite> {
+class ProtobufSerializer : Serializer<MessageLite> {
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
// no configuration
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 65b3b29e..0426ceb1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,8 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.boundary.ServerHandle
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
@@ -28,7 +30,9 @@ import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
+import reactor.ipc.netty.tcp.BlockingNettyContext
import reactor.ipc.netty.tcp.TcpServer
+import java.time.Duration
import java.util.function.BiFunction
/**
@@ -39,17 +43,14 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val sslContextFactory: SslContextFactory,
private val collectorProvider: CollectorProvider) : Server {
- override fun start(): Mono<Void> {
- logger.info { "Listening on port ${serverConfig.port}" }
- return Mono.defer {
- val nettyContext = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
- handleConnection(t, u)
- })
- Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
- }
+ override fun start(): IO<ServerHandle> = IO {
+ val ctx = TcpServer.builder()
+ .options(this::configureServer)
+ .build()
+ .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
+ handleConnection(input)
+ })
+ NettyServerHandle(ctx)
}
private fun configureServer(opts: ServerOptions.Builder<*>) {
@@ -57,20 +58,50 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.sslContext(sslContextFactory.createSslContext(serverConfig.securityConfiguration))
}
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
- logger.debug("Got connection")
- nettyOutbound.alloc()
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
+ logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+
+ val dataStream = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
- val sendHello = nettyOutbound
- .options { it.flushOnEach() }
- .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
- .then()
+ return collectorProvider()
+ .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ }
- val handleIncomingMessages = collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
+ private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ onReadIdle(timeout.toMillis()) {
+ logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
+ context().channel().close().addListener {
- return sendHello.then(handleIncomingMessages)
+ if (it.isSuccess)
+ logger.debug { "Client disconnected because of idle timeout" }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+ return this
+ }
+
+ private fun NettyInbound.logConnectionClosed(): NettyInbound {
+ context().onClose {
+ logger.info("Connection from ${remoteAddress()} has been closed")
+ }
+ return this
}
+
+ private class NettyServerHandle(val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) {
+ override fun shutdown() = IO {
+ ctx.shutdown()
+ }
+
+ override fun await() = IO<Unit> {
+ ctx.context.channel().closeFuture().sync()
+ }
+ }
+
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
index 34a8b928..b788f511 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -56,7 +56,7 @@ internal class StreamBufferEmitter(
else -> {
streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
sink.onDispose {
- logger.debug("Disposing read components")
+ logger.trace { "Disposing read components" }
streamBuffer.discardReadComponents()
}
sink.onRequest { requestedFrameCount ->
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index a576dc65..abebff3d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -84,7 +84,7 @@ internal class WireFrameSink(
try {
decoder.decodeFirst(streamBuffer)
} catch (ex: MissingWireFrameBytesException) {
- logger.debug { "${ex.message} - waiting for more data" }
+ logger.trace { "${ex.message} - waiting for more data" }
null
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 8d01c075..67a7d6f2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -28,4 +29,6 @@ import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
data class ServerConfiguration(
val port: Int,
val configurationUrl: String,
- val securityConfiguration: SecurityConfiguration)
+ val securityConfiguration: SecurityConfiguration,
+ val idleTimeout: Duration,
+ val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index bc030587..e9cd5f3f 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -19,12 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.model
+import arrow.core.Option
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
data class Routing(val routes: List<Route>) {
- fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+ fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
}
data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index c852f5f4..599a9d40 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -19,12 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl
+import arrow.core.None
+import arrow.core.Some
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -61,15 +64,15 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(2)
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
}
it("should be routed to proper topic") {
- assertThat(result?.topic).isEqualTo("ves_rtpm")
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
}
it("should be routed with a given message") {
- assertThat(result?.message).isSameAs(message)
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
}
}
@@ -82,15 +85,15 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(0)
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
}
it("should be routed to proper topic") {
- assertThat(result?.topic).isEqualTo("ves_trace")
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
}
it("should be routed with a given message") {
- assertThat(result?.message).isSameAs(message)
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
}
}
@@ -99,7 +102,7 @@ object RouterTest : Spek({
val result = cut.findDestination(message)
it("should not have route available") {
- assertThat(result).isNull()
+ assertThat(result).isEqualTo(None)
}
}
}
diff --git a/hv-collector-dcae-app-simulator/Dockerfile b/hv-collector-dcae-app-simulator/Dockerfile
index a449078e..68b562d6 100644
--- a/hv-collector-dcae-app-simulator/Dockerfile
+++ b/hv-collector-dcae-app-simulator/Dockerfile
@@ -5,11 +5,11 @@ LABEL license.name="The Apache Software License, Version 2.0"
LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
LABEL maintainer="Nokia Wroclaw ONAP Team"
-EXPOSE 8080
+EXPOSE 5000
WORKDIR /opt/ves-hv-dcae-app-simulator
ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt"]
-CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "8080"]
+CMD ["--kafka-bootstrap-servers", "kafka:9092", "--kafka-topics", "ves_hvRanMeas", "--listen-port", "5000"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
COPY target/hv-collector-dcae-app-simulator-*.jar ./
diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml
index 046f5ed0..a2f92e81 100644
--- a/hv-collector-dcae-app-simulator/pom.xml
+++ b/hv-collector-dcae-app-simulator/pom.xml
@@ -93,6 +93,10 @@
<version>${project.parent.version}</version>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
<groupId>io.ratpack</groupId>
<artifactId>ratpack-core</artifactId>
</dependency>
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
index 3f539302..27edde9c 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfiguration.kt
@@ -24,12 +24,14 @@ import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DefaultValues.API_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
internal object DefaultValues {
const val API_PORT = 8080
+ const val KAFKA_SERVERS = "kafka:9092"
+ const val KAFKA_TOPICS = "ves_hvRanMeas"
}
class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
@@ -41,8 +43,8 @@ class ArgBasedDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfigur
override fun getConfiguration(cmdLine: CommandLine): DcaeAppSimConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, API_PORT)
- val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS)
- val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS).split(",").toSet()
+ val kafkaBootstrapServers = cmdLine.stringValue(KAFKA_SERVERS, DefaultValues.KAFKA_SERVERS)
+ val kafkaTopics = cmdLine.stringValue(KAFKA_TOPICS, DefaultValues.KAFKA_TOPICS).split(",").toSet()
return DcaeAppSimConfiguration(
port,
kafkaBootstrapServers,
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
index f7703b86..d53609ca 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/KafkaSource.kt
@@ -19,10 +19,10 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
+import arrow.effects.IO
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
import reactor.kafka.receiver.KafkaReceiver
import reactor.kafka.receiver.ReceiverOptions
import java.util.*
@@ -33,10 +33,10 @@ import java.util.*
*/
class KafkaSource(private val receiver: KafkaReceiver<ByteArray, ByteArray>) {
- fun start(): Mono<Consumer> = Mono.create { sink ->
+ fun start(): IO<Consumer> = IO {
val consumer = Consumer()
receiver.receive().subscribe(consumer::update)
- sink.success(consumer)
+ consumer
}
companion object {
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
index 5f0fe7f6..66169534 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/kafka/consumer.kt
@@ -19,7 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka
-import reactor.core.publisher.Mono
+import arrow.core.Option
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.kafka.receiver.ReceiverRecord
/**
@@ -27,10 +29,11 @@ import reactor.kafka.receiver.ReceiverRecord
* @since June 2018
*/
-class ConsumerState(val msgCount: Long, val lastKey: ByteArray?, val lastValue: ByteArray?)
+class ConsumerState(val msgCount: Long, val lastKey: Option<ByteArray>, val lastValue: Option<ByteArray>)
interface ConsumerStateProvider {
- fun currentState(): Mono<ConsumerState>
+ fun currentState(): ConsumerState
+ fun reset(): IO<Unit>
}
class Consumer : ConsumerStateProvider {
@@ -38,18 +41,28 @@ class Consumer : ConsumerStateProvider {
private var lastKey: ByteArray? = null
private var lastValue: ByteArray? = null
- override fun currentState(): Mono<ConsumerState> = Mono.create { sink ->
- val state = synchronized(this) {
- ConsumerState(msgCount, lastKey, lastValue)
+ override fun currentState() =
+ ConsumerState(msgCount, Option.fromNullable(lastKey), Option.fromNullable(lastValue))
+
+ override fun reset() = IO {
+ synchronized(this) {
+ msgCount = 0
+ lastKey = null
+ lastValue = null
}
- sink.success(state)
}
fun update(record: ReceiverRecord<ByteArray, ByteArray>) {
+ logger.trace { "Updating stats for message from ${record.topic()}:${record.partition()}" }
+
synchronized(this) {
msgCount++
lastKey = record.key()
lastValue = record.value()
}
}
+
+ companion object {
+ private val logger = Logger(Consumer::class)
+ }
}
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index c037af35..f7d44ede 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -19,32 +19,30 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp
+import arrow.core.Failure
+import arrow.core.Success
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.ArgBasedDcaeAppSimConfiguration
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.KafkaSource
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote.ApiServer
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.slf4j.LoggerFactory
private val logger = Logger(LoggerFactory.getLogger("DCAE simulator :: main"))
fun main(args: Array<String>) {
+ logger.info("Starting DCAE APP simulator")
- try {
- logger.info("Starting DCAE APP simulator")
- val simulatorConfig = ArgBasedDcaeAppSimConfiguration().parse(args)
+ val config = ArgBasedDcaeAppSimConfiguration().parse(args)
+ when (config) {
+ is Success -> startApp(config.value).unsafeRunSync()
+ is Failure -> config.handleErrorsInMain("", logger)
+ }
+}
- KafkaSource.create(simulatorConfig.kafkaBootstrapServers, simulatorConfig.kafkaTopics)
+private fun startApp(config: DcaeAppSimConfiguration) =
+ KafkaSource.create(config.kafkaBootstrapServers, config.kafkaTopics)
.start()
.map(::ApiServer)
- .flatMap { it.start(simulatorConfig.apiPort) }
- .block()
- } catch (e: WrongArgumentException) {
- e.printHelp("java org.onap.dcae.collectors.veshv.simulators.dcaeapp.MainKt")
- System.exit(1)
- } catch (e: Exception) {
- logger.error(e.localizedMessage)
- logger.debug("An error occurred when starting ves dcea app simulator", e)
- System.exit(2)
- }
-}
+ .flatMap { it.start(config.apiPort) }
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
index fcb8e131..2fa8abec 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/remote/ApiServer.kt
@@ -19,13 +19,17 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.remote
+import arrow.core.Try
+import arrow.core.getOrElse
+import arrow.effects.IO
+import com.google.protobuf.MessageOrBuilder
import com.google.protobuf.util.JsonFormat
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.kafka.ConsumerStateProvider
+import org.onap.ves.HVRanMeasFieldsV5.HVRanMeasFields
import org.onap.ves.VesEventV5.VesEvent
import ratpack.handling.Chain
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
-import reactor.core.publisher.Mono
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -34,41 +38,71 @@ import reactor.core.publisher.Mono
class ApiServer(private val consumerState: ConsumerStateProvider) {
private val jsonPrinter = JsonFormat.printer()
- fun start(port: Int): Mono<RatpackServer> = Mono.fromCallable {
- RatpackServer.of { server ->
+ fun start(port: Int): IO<RatpackServer> = IO {
+ RatpackServer.start { server ->
server.serverConfig(ServerConfig.embedded().port(port))
.handlers(this::setupHandlers)
}
- }.doOnNext(RatpackServer::start)
+ }
private fun setupHandlers(chain: Chain) {
chain
.get("messages/count") { ctx ->
ctx.response.contentType(CONTENT_TEXT)
- consumerState.currentState()
- .map { it.msgCount.toString() }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ ctx.response.send(state.msgCount.toString())
}
.get("messages/last/key") { ctx ->
ctx.response.contentType(CONTENT_JSON)
- consumerState.currentState()
- .map { it.lastKey }
- .map { VesEvent.CommonEventHeader.parseFrom(it) }
- .map { jsonPrinter.print(it) }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ val resp = state.lastKey
+ .map { Try { VesEvent.CommonEventHeader.parseFrom(it) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
}
.get("messages/last/value") { ctx ->
ctx.response.contentType(CONTENT_JSON)
- consumerState.currentState()
- .map { it.lastValue }
- .map { VesEvent.parseFrom(it) }
- .map { jsonPrinter.print(it) }
- .subscribe(ctx.response::send)
+ val state = consumerState.currentState()
+ val resp = state.lastValue
+ .map { Try { VesEvent.parseFrom(it) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
+ }
+
+ .get("messages/last/hvRanMeasFields") { ctx ->
+ ctx.response.contentType(CONTENT_JSON)
+ val state = consumerState.currentState()
+ val resp = state.lastValue
+ .flatMap { Try { VesEvent.parseFrom(it) }.toOption() }
+ .filter { it.commonEventHeader.domain == VesEvent.CommonEventHeader.Domain.HVRANMEAS }
+ .map { Try { HVRanMeasFields.parseFrom(it.hvRanMeasFields) } }
+ .map(this::protobufToJson)
+ .getOrElse { "null" }
+ ctx.response.send(resp)
+ }
+
+ .delete("messages") { ctx ->
+ ctx.response.contentType(CONTENT_TEXT)
+ consumerState.reset()
+ .unsafeRunAsync {
+ it.fold(
+ { ctx.response.send("NOK") },
+ { ctx.response.send("OK") }
+ )
+ }
}
}
+ private fun protobufToJson(parseResult: Try<MessageOrBuilder>): String =
+ parseResult.fold(
+ { ex -> "\"Failed to parse protobuf: ${ex.message}\"" },
+ { jsonPrinter.print(it) })
+
+
companion object {
private const val CONTENT_TEXT = "text/plain"
private const val CONTENT_JSON = "application/json"
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
index 817df8e0..d99de17b 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/config/ArgBasedDcaeAppSimConfigurationTest.kt
@@ -19,13 +19,14 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.config
+import arrow.core.Failure
+import arrow.core.Success
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import kotlin.test.assertFailsWith
internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
@@ -38,7 +39,21 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
cut = ArgBasedDcaeAppSimConfiguration()
}
- fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+ fun parseExpectingSuccess(vararg cmdLine: String): DcaeAppSimConfiguration {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> result.value
+ is Failure -> throw AssertionError("Parsing result should be present")
+ }
+ }
+
+ fun parseExpectingFailure(vararg cmdLine: String): Throwable {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> throw AssertionError("parsing should have failed")
+ is Failure -> result.exception
+ }
+ }
describe("parsing arguments") {
lateinit var result: DcaeAppSimConfiguration
@@ -46,7 +61,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("all parameters are present in the long form") {
beforeEachTest {
- result = parse("--listen-port", "6969",
+ result = parseExpectingSuccess("--listen-port", "6969",
"--kafka-bootstrap-servers", kafkaBootstrapServers,
"--kafka-topics", kafkaTopics
)
@@ -71,7 +86,9 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("some parameters are present in the short form") {
beforeEachTest {
- result = parse("-p", "666", "--kafka-bootstrap-servers", kafkaBootstrapServers, "-f", kafkaTopics)
+ result = parseExpectingSuccess("-p", "666",
+ "--kafka-bootstrap-servers", kafkaBootstrapServers,
+ "-f", kafkaTopics)
}
it("should set proper port") {
@@ -92,7 +109,7 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
given("all optional parameters are absent") {
beforeEachTest {
- result = parse("-s", kafkaBootstrapServers, "-f", kafkaTopics)
+ result = parseExpectingSuccess("-s", kafkaBootstrapServers, "-f", kafkaTopics)
}
it("should set default port") {
@@ -100,21 +117,20 @@ internal class ArgBasedDcaeAppSimConfigurationTest : Spek({
}
}
-
describe("required parameter is absent") {
given("kafka topics are missing") {
it("should throw exception") {
- assertFailsWith<WrongArgumentException> { parse("-s", kafkaBootstrapServers) }
+ assertThat(parseExpectingFailure("-s", kafkaBootstrapServers))
+ .isInstanceOf(WrongArgumentException::class.java)
}
}
given("kafka bootstrap servers are missing") {
it("should throw exception") {
- assertFailsWith<WrongArgumentException> { parse("-f", kafkaTopics) }
+ assertThat(parseExpectingFailure("-f", kafkaTopics))
+ .isInstanceOf(WrongArgumentException::class.java)
}
}
}
}
-
-
}) \ No newline at end of file
diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml
index de9004a8..c11510ac 100644
--- a/hv-collector-domain/pom.xml
+++ b/hv-collector-domain/pom.xml
@@ -111,7 +111,6 @@
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
<artifactId>kotlin-test</artifactId>
- <version>${kotlin.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
index 59b91d7f..f3e97be2 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfiguration.kt
@@ -19,16 +19,19 @@
*/
package org.onap.dcae.collectors.veshv.main
-import org.apache.commons.cli.DefaultParser
import org.apache.commons.cli.CommandLine
+import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CONSUL_CONFIG_URL
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.DUMMY_MODE
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_TIMEOUT_SEC
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.PRIVATE_KEY_FILE
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.CERT_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_CERT_FILE
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.time.Duration
internal object DefaultValues {
const val PORT = 6061
@@ -36,6 +39,7 @@ internal object DefaultValues {
const val PRIVATE_KEY_FILE = "/etc/ves-hv/server.key"
const val CERT_FILE = "/etc/ves-hv/server.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
+ const val IDLE_TIMEOUT_SEC = 60L
}
internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) {
@@ -44,14 +48,23 @@ internal class ArgBasedServerConfiguration : ArgBasedConfiguration<ServerConfigu
CONSUL_CONFIG_URL,
PRIVATE_KEY_FILE,
CERT_FILE,
- TRUST_CERT_FILE
+ TRUST_CERT_FILE,
+ IDLE_TIMEOUT_SEC,
+ DUMMY_MODE
)
override fun getConfiguration(cmdLine: CommandLine): ServerConfiguration {
val port = cmdLine.intValue(LISTEN_PORT, DefaultValues.PORT)
val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL, DefaultValues.CONFIG_URL)
+ val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine)
- return ServerConfiguration(port, configUrl, security)
+ return ServerConfiguration(
+ port = port,
+ configurationUrl = configUrl,
+ securityConfiguration = security,
+ idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ dummyMode = dummyMode)
}
private fun createSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration {
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 1f2686ba..074a75e4 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -19,37 +19,49 @@
*/
package org.onap.dcae.collectors.veshv.main
+import arrow.core.flatMap
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.utils.commandline.WrongArgumentException
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.utils.commandline.handleErrorsInMain
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
-import org.slf4j.LoggerFactory
-import kotlin.system.exitProcess
-private val logger = LoggerFactory.getLogger("main")
+private val logger = Logger("org.onap.dcae.collectors.veshv.main")
+private const val PROGRAM_NAME = "java org.onap.dcae.collectors.veshv.main.MainKt"
fun main(args: Array<String>) {
- try {
- val serverConfiguration = ArgBasedServerConfiguration().parse(args)
-
- val collectorProvider = CollectorFactory(
- resolveConfigurationProvider(serverConfiguration),
- AdapterFactory.kafkaSink(),
- MicrometerMetrics()
- ).createVesHvCollectorProvider()
- ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
- } catch (ex: WrongArgumentException) {
- ex.printMessage()
- ex.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
- exitProcess(1)
- }
+ ArgBasedServerConfiguration().parse(args)
+ .toEither()
+ .map(::createServer)
+ .map(Server::start)
+ .flatMap { it.attempt().unsafeRunSync() }
+ .fold(
+ { ex ->
+ handleErrorsInMain(ex, PROGRAM_NAME, logger)
+ },
+ { handle ->
+ logger.info("Server started. Listening on ${handle.host}:${handle.port}")
+ handle.await().unsafeRunSync()
+ }
+ )
}
+private fun createServer(config: ServerConfiguration): Server {
+ val sink = if (config.dummyMode) AdapterFactory.loggingSink() else AdapterFactory.kafkaSink()
+ val collectorProvider = CollectorFactory(
+ resolveConfigurationProvider(config),
+ sink,
+ MicrometerMetrics()
+ ).createVesHvCollectorProvider()
+
+ return ServerFactory.createNettyTcpServer(config, collectorProvider)
+}
private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguration): ConfigurationProvider {
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
index 48da3b18..5127e7ef 100644
--- a/hv-collector-main/src/main/resources/logback.xml
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -26,7 +26,10 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="TRACE"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="TRACE"/>
<!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
<root level="INFO">
diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
index 923f9d58..4c2425bc 100644
--- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
+++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgBasedServerConfigurationTest.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.main
+import arrow.core.Failure
+import arrow.core.Success
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
@@ -44,7 +46,13 @@ object ArgBasedServerConfigurationTest : Spek({
cut = ArgBasedServerConfiguration()
}
- fun parse(vararg cmdLine: String) = cut.parse(cmdLine)
+ fun parse(vararg cmdLine: String): ServerConfiguration {
+ val result = cut.parse(cmdLine)
+ return when (result) {
+ is Success -> result.value
+ is Failure -> throw AssertionError("Parsing result should be present")
+ }
+ }
describe("parsing arguments") {
given("all parameters are present in the long form") {
diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml
index 8a8a1d8c..3c48280c 100644
--- a/hv-collector-utils/pom.xml
+++ b/hv-collector-utils/pom.xml
@@ -68,6 +68,10 @@
<artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-instances-data</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
index 968c340f..34c0e651 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/ArgBasedConfiguration.kt
@@ -19,44 +19,54 @@
*/
package org.onap.dcae.collectors.veshv.utils.commandline
+import arrow.core.Option
+import arrow.core.Try
+import arrow.core.getOrElse
+import arrow.core.recoverWith
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.CommandLineParser
import org.apache.commons.cli.Options
import java.io.File
+import java.nio.file.Path
import java.nio.file.Paths
abstract class ArgBasedConfiguration<T>(val parser: CommandLineParser) {
abstract val cmdLineOptionsList: List<CommandLineOption>
- fun parse(args: Array<out String>): T {
+ fun parse(args: Array<out String>): Try<T> {
val commandLineOptions = cmdLineOptionsList.map { it.option }.fold(Options(), Options::addOption)
- try {
- val cmdLine = parser.parse(commandLineOptions, args)
- return getConfiguration(cmdLine)
- } catch (ex: Exception) {
- throw WrongArgumentException(ex, commandLineOptions)
- }
+ return Try {
+ parser.parse(commandLineOptions, args)
+ }.recoverWith { ex ->
+ Try.raise<CommandLine>(WrongArgumentException(ex, commandLineOptions))
+ }.map (this::getConfiguration)
}
protected abstract fun getConfiguration(cmdLine: CommandLine): T
- protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Int =
- getOptionValue(cmdLineOpt.option.opt).toInt()
-
protected fun CommandLine.intValue(cmdLineOpt: CommandLineOption, default: Int): Int =
- getOptionValue(cmdLineOpt.option.opt)?.toInt() ?: default
-
- protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Long =
- getOptionValue(cmdLineOpt.option.opt).toLong()
+ intValue(cmdLineOpt).getOrElse { default }
protected fun CommandLine.longValue(cmdLineOpt: CommandLineOption, default: Long): Long =
- getOptionValue(cmdLineOpt.option.opt)?.toLong() ?: default
+ longValue(cmdLineOpt).getOrElse { default }
- protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): String =
- getOptionValue(cmdLineOpt.option.opt)
+ protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption): Option<String> =
+ optionValue(cmdLineOpt)
protected fun CommandLine.stringValue(cmdLineOpt: CommandLineOption, default: String): String =
- getOptionValue(cmdLineOpt.option.opt) ?: default
+ optionValue(cmdLineOpt).getOrElse { default }
+
+ protected fun CommandLine.hasOption(cmdLineOpt: CommandLineOption): Boolean =
+ this.hasOption(cmdLineOpt.option.opt)
+
+ protected fun stringPathToPath(path: String): Path = Paths.get(File(path).toURI())
+
+ private fun CommandLine.optionValue(cmdLineOpt: CommandLineOption): Option<String> =
+ Option.fromNullable(getOptionValue(cmdLineOpt.option.opt))
+
+ private fun CommandLine.intValue(cmdLineOpt: CommandLineOption): Option<Int> =
+ optionValue(cmdLineOpt).map(String::toInt)
- protected fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
+ private fun CommandLine.longValue(cmdLineOpt: CommandLineOption): Option<Long> =
+ optionValue(cmdLineOpt).map(String::toLong)
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 9d1f7aa8..942ca31f 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -87,4 +87,16 @@ enum class CommandLineOption(val option: Option) {
.desc("File with trusted certificate bundle for trusting connections")
.build()
),
+ IDLE_TIMEOUT_SEC(Option.builder("i")
+ .longOpt("idle-timeout-sec")
+ .hasArg()
+ .desc("""Idle timeout for remote hosts. After given time without any data exchange the
+ |connection might be closed.""".trimMargin())
+ .build()
+ ),
+ DUMMY_MODE(Option.builder("d")
+ .longOpt("dummy")
+ .desc("If present will start in dummy mode (dummy external services)")
+ .build()
+ ),
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt
index 5f6a86ad..083d5798 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/WrongArgumentException.kt
@@ -23,7 +23,14 @@ import org.apache.commons.cli.HelpFormatter
import org.apache.commons.cli.Options
-class WrongArgumentException(parent: Exception, private val options: Options) : Exception(parent.message, parent) {
+class WrongArgumentException(
+ message: String,
+ private val options: Options,
+ parent: Throwable? = null
+) : Exception(message, parent) {
+
+ constructor(par: Throwable, options: Options) : this(par.message ?: "", options, par)
+
fun printMessage() {
println(message)
}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
new file mode 100644
index 00000000..23bf1658
--- /dev/null
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/extensions.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.utils.commandline
+
+import arrow.core.Failure
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import kotlin.system.exitProcess
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+
+fun handleErrorsInMain(ex: Throwable, programName: String, logger: Logger) {
+ when (ex) {
+ is WrongArgumentException -> {
+ ex.printMessage()
+ ex.printHelp(programName)
+ exitProcess(1)
+ }
+
+ else -> {
+ logger.error(ex.localizedMessage)
+ logger.debug("An error occurred when starting VES HV Collector", ex)
+ System.exit(2)
+ }
+ }
+}
+
+
+fun <A> Failure<A>.handleErrorsInMain(programName: String, logger: Logger) {
+ handleErrorsInMain(this.exception, programName, logger)
+}
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index f614d426..536fe93c 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -24,11 +24,15 @@ import kotlin.reflect.KClass
class Logger(val logger: org.slf4j.Logger) {
constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
+ constructor(name: String) : this(LoggerFactory.getLogger(name))
//
// TRACE
//
+ val traceEnabled: Boolean
+ get() = logger.isTraceEnabled
+
fun trace(messageProvider: () -> String) {
if (logger.isTraceEnabled) {
logger.trace(messageProvider())
diff --git a/pom.xml b/pom.xml
index f478df3c..adc53a70 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,7 +50,8 @@
</modules>
<properties>
- <kotlin.version>1.2.41</kotlin.version>
+ <kotlin.version>1.2.50</kotlin.version>
+ <arrow.version>0.7.2</arrow.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>
@@ -531,12 +532,37 @@
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-core</artifactId>
- <version>0.7.2</version>
+ <version>${arrow.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-stdlib-jdk7</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>io.arrow-kt</groupId>
<artifactId>arrow-syntax</artifactId>
- <version>0.7.2</version>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-instances-core</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-instances-data</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ <version>${arrow.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>