diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-07 15:08:43 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-15 12:51:43 +0100 |
commit | 4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 (patch) | |
tree | baad5e6314ef6d2a0f1409b0a23e0001e814f0a8 | |
parent | 3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (diff) |
Update project and dependencies
* Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay
suggestion
* Updated Reactor to BOM Californium-SR2
* Updated mockito-kotlin to 2.0.0
* Introduced some fixes to support OpenJDK 11 compilation
Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58
Issue-ID: DCAEGEN2-961
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
35 files changed, 280 insertions, 359 deletions
diff --git a/hv-collector-analysis/pom.xml b/hv-collector-analysis/pom.xml index 560cc58c..9ace2758 100644 --- a/hv-collector-analysis/pom.xml +++ b/hv-collector-analysis/pom.xml @@ -32,7 +32,7 @@ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-analysis</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <description>VES HighVolume Collector :: Code analysis configuration</description> <build> diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index ddfa3ed3..a8135292 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -111,7 +111,7 @@ <artifactId>reactor-extra</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.ipc</groupId> + <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> @@ -127,36 +127,6 @@ <artifactId>javax.json</artifactId> <scope>runtime</scope> </dependency> - - - <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 07b5c82e..78afe9fd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d08ad9e9..af4bbaa1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, } companion object { - private const val MAX_RETRIES = 5 + private const val MAX_RETRIES = 5L private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 4503955f..1672158e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -19,10 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import io.netty.handler.codec.http.HttpStatusClass import org.slf4j.LoggerFactory import reactor.core.publisher.Mono -import reactor.core.publisher.toMono -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient +import java.lang.IllegalStateException import java.nio.charset.Charset /** @@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get(url + createQueryString(queryParams)) + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else + Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}")) + } .doOnError { logger.error("Failed to get resource on path: $url (${it.localizedMessage})") logger.debug("Nested exception:", it) } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } - private fun createQueryString(params: Map<String, Any>): String { if (params.isEmpty()) @@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } - return builder.removeSuffix("&").toString() + return builder.removeSuffix("&").toString() } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 7a47cfc3..e535300a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.Option +import arrow.core.getOrElse import arrow.effects.IO -import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Mono -import reactor.ipc.netty.ByteBufFlux -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.TcpServer +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpServer import java.time.Duration -import java.util.function.BiFunction /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val collectorProvider: CollectorProvider) : Server { override fun start(): IO<ServerHandle> = IO { - val ctx = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ -> - handleConnection(input) - }) - NettyServerHandle(ctx) - } + val tcpServer = TcpServer.create() + .addressSupplier { serverConfig.serverListenAddress } + .configureSsl() + .handle(this::handleConnection) - private fun configureServer(opts: ServerOptions.Builder<*>) { - val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration) - if (sslContext.isDefined()) opts.sslContext(sslContext.orNull()) - opts.port(serverConfig.listenPort) + NettyServerHandle(tcpServer.bindNow()) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + private fun TcpServer.configureSsl() = + sslContextFactory + .createSslContext(serverConfig.securityConfiguration) + .map { sslContext -> + this.secure { b -> b.sslContext(sslContext) } + }.getOrElse { this } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + nettyInbound.withConnection { conn -> + logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } + } Mono.empty() }, { - logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - val allocator = nettyInbound.context().channel().alloc() - it.handleConnection(allocator, createDataStream(nettyInbound)) + nettyInbound.withConnection { conn -> + logger.info { "Handling connection from ${conn.address()}" } + conn.configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + } + it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) } ) - - fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + private fun Connection.configureIdleTimeout(timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } disconnectClient() } return this } - private fun NettyInbound.disconnectClient() { - context().channel().close().addListener { + private fun Connection.disconnectClient() { + channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${remoteAddress()}) closed successfully." } + logger.debug { "Channel (${address()}) closed successfully." } else logger.warn("Channel close failed", it.cause()) } } - private fun NettyInbound.logConnectionClosed(): NettyInbound { - context().onClose { - logger.info("Connection from ${remoteAddress()} has been closed") + private fun Connection.logConnectionClosed(): Connection { + onTerminate().subscribe { + logger.info("Connection from ${address()} has been closed") } return this } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 7a7d9342..85117684 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress import java.time.Duration /** @@ -27,10 +28,10 @@ import java.time.Duration * @since May 2018 */ data class ServerConfiguration( - val listenPort: Int, + val serverListenAddress: InetSocketAddress, val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, - val healthCheckApiPort: Int, + val healthCheckApiListenAddress: InetSocketAddress, val maximumPayloadSizeBytes: Int, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 9a6889c8..7a1a4cdc 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -19,9 +19,9 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import com.nhaarman.mockito_kotlin.eq -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 123d8f72..91457faf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -19,21 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.verify -import com.nhaarman.mockito_kotlin.whenever -import io.netty.buffer.Unpooled -import io.netty.handler.codec.http.HttpContent import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.ipc.netty.http.client.HttpClient -import reactor.ipc.netty.http.client.HttpClientResponse +import reactor.netty.http.client.HttpClient +import reactor.netty.http.server.HttpServer import reactor.test.StepVerifier -import java.nio.charset.Charset +import reactor.test.test /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -42,66 +36,51 @@ import java.nio.charset.Charset internal object HttpAdapterTest : Spek({ describe("HttpAdapter") { - val httpClientMock: HttpClient = mock() - val httpAdapter = HttpAdapter(httpClientMock) + val httpServer = HttpServer.create() + .host("127.0.0.1") + .route { routes -> + routes.get("/url") { req, resp -> + resp.sendString(Mono.just(req.uri())) + } + } + .bindNow() + val baseUrl = "http://${httpServer.host()}:${httpServer.port()}" + val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl)) + + afterGroup { + httpServer.disposeNow() + } given("url without query params") { - val initialUrl = "http://test-url" - whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty()) + val url = "/url" it("should not append query string") { - httpAdapter.get(initialUrl) - verify(httpClientMock).get(initialUrl) + httpAdapter.get(url).test() + .expectNext(url) + .verifyComplete() } } given("url with query params") { - val queryParams = mapOf(Pair("key", "value")) - val initialUrl = "http://test-url" - val expectedUrl = "http://test-url?key=value" - whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty()) - - it("should parse them to query string and append to url") { - httpAdapter.get(initialUrl, queryParams) - verify(httpClientMock).get(expectedUrl) - } - } + val queryParams = mapOf(Pair("p", "the-value")) + val url = "/url" - given("valid resource url") { - val validUrl = "http://valid-url/" - val responseContent = """{"key1": "value1", "key2": "value2"}""" - val httpResponse = createHttpResponseMock(responseContent) - whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) - - it("should return response string") { - StepVerifier - .create(httpAdapter.get(validUrl)) - .expectNext(responseContent) + it("should add them as query string to the url") { + httpAdapter.get(url, queryParams).test() + .expectNext("/url?p=the-value") + .verifyComplete() } } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - val exceptionMessage = "Test exception" - whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) + given("invalid url") { + val invalidUrl = "/wtf" it("should interrupt the flux") { StepVerifier .create(httpAdapter.get(invalidUrl)) - .verifyErrorMessage(exceptionMessage) + .verifyError() } } } -}) - -fun createHttpResponseMock(content: String): HttpClientResponse { - val responseMock: HttpClientResponse = mock() - val contentMock: HttpContent = mock() - val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset()) - - whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock)) - whenever(contentMock.content()).thenReturn(contentByteBuff) - - return responseMock -} +})
\ No newline at end of file diff --git a/hv-collector-coverage/pom.xml b/hv-collector-coverage/pom.xml index a6f3c8dd..f1e5c742 100644 --- a/hv-collector-coverage/pom.xml +++ b/hv-collector-coverage/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/hv-collector-ct/pom.xml b/hv-collector-ct/pom.xml index 2482d732..07da24b1 100644 --- a/hv-collector-ct/pom.xml +++ b/hv-collector-ct/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/hv-collector-dcae-app-simulator/pom.xml b/hv-collector-dcae-app-simulator/pom.xml index f0a87713..82e99f95 100644 --- a/hv-collector-dcae-app-simulator/pom.xml +++ b/hv-collector-dcae-app-simulator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -141,34 +141,10 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> </dependency> <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> - <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>runtime</scope> diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index 83dceb6a..17eeb5b1 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -19,10 +19,9 @@ */ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config -import arrow.core.ForOption import arrow.core.Option import arrow.core.fix -import arrow.instances.extensions +import arrow.instances.option.monad.monad import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser @@ -45,26 +44,24 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration ) override fun getConfiguration(cmdLine: CommandLine): Option<DcaeAppSimConfiguration> = - ForOption extensions { - binding { - val listenPort = cmdLine - .intValue(LISTEN_PORT) - .bind() - val maxPayloadSizeBytes = cmdLine - .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) - val kafkaBootstrapServers = cmdLine - .stringValue(KAFKA_SERVERS) - .bind() - val kafkaTopics = cmdLine - .stringValue(KAFKA_TOPICS) - .map { it.split(",").toSet() } - .bind() + Option.monad().binding { + val listenPort = cmdLine + .intValue(LISTEN_PORT) + .bind() + val maxPayloadSizeBytes = cmdLine + .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + val kafkaBootstrapServers = cmdLine + .stringValue(KAFKA_SERVERS) + .bind() + val kafkaTopics = cmdLine + .stringValue(KAFKA_TOPICS) + .map { it.split(",").toSet() } + .bind() - DcaeAppSimConfiguration( - listenPort, - maxPayloadSizeBytes, - kafkaBootstrapServers, - kafkaTopics) - }.fix() - } + DcaeAppSimConfiguration( + listenPort, + maxPayloadSizeBytes, + kafkaBootstrapServers, + kafkaTopics) + }.fix() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt index aceb746a..e1641cbb 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt @@ -24,20 +24,19 @@ import arrow.core.None import arrow.core.Some import arrow.effects.IO import com.google.protobuf.ByteString -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.eq -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.never -import com.nhaarman.mockito_kotlin.verify -import com.nhaarman.mockito_kotlin.whenever +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.never +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.mockito.ArgumentMatchers.anySet -import org.mockito.Mockito -import org.onap.ves.VesEventOuterClass.VesEvent import org.onap.ves.VesEventOuterClass.CommonEventHeader +import org.onap.ves.VesEventOuterClass.VesEvent import java.util.concurrent.ConcurrentLinkedQueue /** diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 5e3090af..a631be76 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -22,9 +22,9 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl import arrow.core.Either import arrow.core.Right import com.google.protobuf.ByteString -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.fail import org.jetbrains.spek.api.Spek diff --git a/hv-collector-domain/pom.xml b/hv-collector-domain/pom.xml index 95b8d845..e03de3ca 100644 --- a/hv-collector-domain/pom.xml +++ b/hv-collector-domain/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -96,7 +96,7 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.ipc</groupId> + <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> diff --git a/hv-collector-health-check/pom.xml b/hv-collector-health-check/pom.xml index 45fa2e04..6ecdc95b 100644 --- a/hv-collector-health-check/pom.xml +++ b/hv-collector-health-check/pom.xml @@ -19,7 +19,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -50,7 +50,7 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.ipc</groupId> + <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> diff --git a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt index 753f73ef..b4d7c142 100644 --- a/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt +++ b/hv-collector-health-check/src/main/kotlin/org/onap/dcae/collectors/veshv/healthcheck/factory/HealthCheckApiServer.kt @@ -27,26 +27,30 @@ import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.ipc.netty.http.server.HttpServer -import reactor.ipc.netty.http.server.HttpServerRequest -import reactor.ipc.netty.http.server.HttpServerResponse +import reactor.netty.http.server.HttpServer +import reactor.netty.http.server.HttpServerRequest +import reactor.netty.http.server.HttpServerResponse +import java.net.SocketAddress import java.util.concurrent.atomic.AtomicReference /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since August 2018 */ -class HealthCheckApiServer(private val healthState: HealthState, private val port: Int) { +class HealthCheckApiServer(private val healthState: HealthState, + private val listenAddress: SocketAddress) { private val healthDescription: AtomicReference<HealthDescription> = AtomicReference(HealthDescription.STARTING) fun start(): IO<ServerHandle> = IO { healthState().subscribe(healthDescription::set) - val ctx = HttpServer.create(port).startRouter { routes -> - routes.get("/health/ready", ::readinessHandler) - routes.get("/health/alive", ::livenessHandler) - } - NettyServerHandle(ctx) + val ctx = HttpServer.create() + .tcpConfiguration { it.addressSupplier { listenAddress } } + .route { routes -> + routes.get("/health/ready", ::readinessHandler) + routes.get("/health/alive", ::livenessHandler) + } + NettyServerHandle(ctx.bindNow()) } private fun readinessHandler(req: HttpServerRequest, resp: HttpServerResponse) = @@ -55,6 +59,6 @@ class HealthCheckApiServer(private val healthState: HealthState, private val por } private fun livenessHandler(req: HttpServerRequest, resp: HttpServerResponse) = - resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet")) + resp.status(HttpResponseStatus.NOT_IMPLEMENTED).sendString(Mono.just("Not implemented yet")) } diff --git a/hv-collector-main/pom.xml b/hv-collector-main/pom.xml index fda519ca..9e7101b0 100644 --- a/hv-collector-main/pom.xml +++ b/hv-collector-main/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -137,30 +137,6 @@ <groupId>io.micrometer</groupId> <artifactId>micrometer-registry-jmx</artifactId> </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> </dependencies> diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index 0f382196..81d916dd 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -19,10 +19,8 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.core.ForOption import arrow.core.Option import arrow.core.fix -import arrow.instances.extensions import arrow.instances.option.monad.monad import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine @@ -49,6 +47,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.hasOption import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.longValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue +import java.net.InetSocketAddress import java.time.Duration internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration>(DefaultParser()) { @@ -81,8 +80,8 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration val security = createSecurityConfiguration(cmdLine).bind() val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() ServerConfiguration( - healthCheckApiPort = healthCheckApiPort, - listenPort = listenPort, + serverListenAddress = InetSocketAddress(listenPort), + healthCheckApiListenAddress = InetSocketAddress(healthCheckApiPort), configurationProviderParams = configurationProviderParams, securityConfiguration = security, idleTimeout = Duration.ofSeconds(idleTimeoutSec), @@ -91,24 +90,22 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration }.fix() private fun createConfigurationProviderParams(cmdLine: CommandLine): Option<ConfigurationProviderParams> = - ForOption extensions { - binding { - val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() - val firstRequestDelay = cmdLine.longValue( - CONSUL_FIRST_REQUEST_DELAY, - DefaultValues.CONSUL_FIRST_REQUEST_DELAY - ) - val requestInterval = cmdLine.longValue( - CONSUL_REQUEST_INTERVAL, - DefaultValues.CONSUL_REQUEST_INTERVAL - ) - ConfigurationProviderParams( - configUrl, - Duration.ofSeconds(firstRequestDelay), - Duration.ofSeconds(requestInterval) - ) - }.fix() - } + Option.monad().binding { + val configUrl = cmdLine.stringValue(CONSUL_CONFIG_URL).bind() + val firstRequestDelay = cmdLine.longValue( + CONSUL_FIRST_REQUEST_DELAY, + DefaultValues.CONSUL_FIRST_REQUEST_DELAY + ) + val requestInterval = cmdLine.longValue( + CONSUL_REQUEST_INTERVAL, + DefaultValues.CONSUL_REQUEST_INTERVAL + ) + ConfigurationProviderParams( + configUrl, + Duration.ofSeconds(firstRequestDelay), + Duration.ofSeconds(requestInterval) + ) + }.fix() internal object DefaultValues { const val HEALTH_CHECK_API_PORT = 6060 diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index 04fc021d..ae59da69 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -32,7 +32,9 @@ object HealthCheckServer : ServerStarter() { override fun startServer(config: ServerConfiguration) = createHealthCheckServer(config).start() private fun createHealthCheckServer(config: ServerConfiguration) = - HealthCheckApiServer(HealthState.INSTANCE, config.healthCheckApiPort) + HealthCheckApiServer( + HealthState.INSTANCE, + config.healthCheckApiListenAddress) override fun serverStartedMessage(handle: ServerHandle) = "Health check server is up and listening on ${handle.host}:${handle.port}" diff --git a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt index 0cf0bb2c..1aac6a09 100644 --- a/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt +++ b/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfigurationTest.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.main -import arrow.core.identity import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe @@ -70,12 +69,21 @@ object ArgVesHvConfigurationTest : Spek({ ) } + it("should set proper listen port") { + assertThat(result.serverListenAddress.port).isEqualTo(listenPort.toInt()) + } + + + it("should set default listen address") { + assertThat(result.serverListenAddress.address.hostAddress).isEqualTo("0.0.0.0") + } + it("should set proper health check api port") { - assertThat(result.healthCheckApiPort).isEqualTo(healthCheckApiPort.toInt()) + assertThat(result.healthCheckApiListenAddress.port).isEqualTo(healthCheckApiPort.toInt()) } - it("should set proper listen port") { - assertThat(result.listenPort).isEqualTo(listenPort.toInt()) + it("should set default health check api address") { + assertThat(result.healthCheckApiListenAddress.address.hostAddress).isEqualTo("0.0.0.0") } it("should set proper first consul request delay") { diff --git a/hv-collector-ssl/pom.xml b/hv-collector-ssl/pom.xml index 1afa318c..e9cbdc22 100644 --- a/hv-collector-ssl/pom.xml +++ b/hv-collector-ssl/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/hv-collector-test-utils/pom.xml b/hv-collector-test-utils/pom.xml index 8e666a98..9dad2ace 100644 --- a/hv-collector-test-utils/pom.xml +++ b/hv-collector-test-utils/pom.xml @@ -14,7 +14,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -56,5 +56,30 @@ <artifactId>assertj-core</artifactId> <scope>compile</scope> </dependency> + <dependency> + <groupId>org.jetbrains.kotlin</groupId> + <artifactId>kotlin-test</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-api</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.jetbrains.spek</groupId> + <artifactId>spek-junit-platform-engine</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-test</artifactId> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + <scope>compile</scope> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/hv-collector-utils/pom.xml b/hv-collector-utils/pom.xml index 868dd95d..d38ccb9b 100644 --- a/hv-collector-utils/pom.xml +++ b/hv-collector-utils/pom.xml @@ -37,7 +37,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -103,14 +103,6 @@ <artifactId>slf4j-api</artifactId> </dependency> <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> </dependency> @@ -131,6 +123,10 @@ <artifactId>reactor-test</artifactId> </dependency> <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + </dependency> + <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>test</scope> diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt index bb924f27..bdb63b68 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/server_handle.kt @@ -20,7 +20,7 @@ package org.onap.dcae.collectors.veshv.utils import arrow.effects.IO -import reactor.ipc.netty.tcp.BlockingNettyContext +import reactor.netty.DisposableServer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -35,12 +35,12 @@ abstract class ServerHandle(val host: String, val port: Int) { * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -class NettyServerHandle(private val ctx: BlockingNettyContext) : ServerHandle(ctx.host, ctx.port) { +class NettyServerHandle(private val ctx: DisposableServer) : ServerHandle(ctx.host(), ctx.port()) { override fun shutdown() = IO { - ctx.shutdown() + ctx.disposeNow() } override fun await() = IO<Unit> { - ctx.context.channel().closeFuture().sync() + ctx.channel().closeFuture().sync() } } diff --git a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt index b98131cc..462aabe5 100644 --- a/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt +++ b/hv-collector-utils/src/test/kotlin/org/onap/dcae/collectors/veshv/utils/logging/LoggerTest.kt @@ -19,10 +19,11 @@ */ package org.onap.dcae.collectors.veshv.utils.logging -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.verify -import com.nhaarman.mockito_kotlin.verifyNoMoreInteractions -import com.nhaarman.mockito_kotlin.whenever +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.verify +import com.nhaarman.mockitokotlin2.verifyNoMoreInteractions +import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it diff --git a/hv-collector-ves-message-generator/pom.xml b/hv-collector-ves-message-generator/pom.xml index 87728f67..dae42e2d 100644 --- a/hv-collector-ves-message-generator/pom.xml +++ b/hv-collector-ves-message-generator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index cc1d16fe..fa39ed16 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -31,9 +31,8 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVA import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.INVALID_WIRE_FRAME import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.TOO_BIG_PAYLOAD import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID -import org.onap.ves.VesEventOuterClass.VesEvent import org.onap.ves.VesEventOuterClass.CommonEventHeader - +import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.charset.Charset @@ -54,10 +53,17 @@ class MessageGeneratorImpl internal constructor( private fun createMessageFlux(parameters: MessageParameters): Flux<WireFrameMessage> = Mono.fromCallable { createMessage(parameters.commonEventHeader, parameters.messageType) } .let { - if (parameters.amount < 0) - it.repeat() - else - it.repeat(parameters.amount) + when { + parameters.amount < 0 -> + // repeat forever + it.repeat() + parameters.amount == 0L -> + // do not generate any message + Flux.empty() + else -> + // send original message and additional amount-1 messages + it.repeat(parameters.amount - 1) + } } private fun createMessage(commonEventHeader: CommonEventHeader, messageType: MessageType): WireFrameMessage = diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt index ee76b789..e2aec7df 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt @@ -50,6 +50,7 @@ object MessageGeneratorImplTest : Spek({ val maxPayloadSizeBytes = 1024 val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes) given("single message parameters") { + on("messages amount not specified in parameters") { it("should create infinite flux") { val limit = 1000L @@ -64,6 +65,20 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + + on("messages amount = 0 specified in parameters") { + it("should create empty message flux") { + generator + .createMessageFlux(listOf(MessageParameters( + commonHeader(PERF3GPP), + MessageType.VALID, + 0 + ))) + .test() + .verifyComplete() + } + } + on("messages amount specified in parameters") { it("should create message flux of specified size") { generator @@ -77,6 +92,7 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting valid message") { it("should create flux of valid messages with given domain") { generator @@ -94,6 +110,7 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting too big payload") { it("should create flux of messages with given domain and payload exceeding threshold") { @@ -112,6 +129,7 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting invalid GPB data ") { it("should create flux of messages with invalid payload") { generator @@ -130,6 +148,7 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting invalid wire frame ") { it("should create flux of messages with invalid version") { generator @@ -148,6 +167,7 @@ object MessageGeneratorImplTest : Spek({ .verifyComplete() } } + on("message type requesting fixed payload") { it("should create flux of valid messages with fixed payload") { generator diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml index 85ef0907..6526915a 100644 --- a/hv-collector-xnf-simulator/pom.xml +++ b/hv-collector-xnf-simulator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -139,34 +139,6 @@ </dependency> --> <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> - <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>runtime</scope> diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 7a280c10..8df416c9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -35,8 +35,8 @@ import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpClient /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient */ class VesHvClient(private val configuration: SimulatorConfiguration) { - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security).orNull()) + private val client: TcpClient = TcpClient.create() + .host(configuration.vesHost) + .port(configuration.vesPort) + .secure { sslSpec -> + createSslContext(configuration.security).fold({}, sslSpec::sslContext) } - .build() fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() @@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client - .newHandler { _, output -> handler(complete, messages, output) } + .handle { _, output -> handler(complete, messages, output) } + .connect() .doOnError { logger.info("Failed to connect to VesHvCollector on " + "${configuration.vesHost}:${configuration.vesPort}") @@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = ClientSslContextFactory().createSslContext(config) - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } + private fun NettyOutbound.logConnectionClosed() = + withConnection { conn -> + conn.onTerminate().subscribe { + logger.info { "Connection to ${conn.address()} has been closed" } + } + } companion object { private val logger = Logger(VesHvClient::class) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 2a78ed5e..95510e77 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,10 +23,9 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import arrow.effects.IO -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever -import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it @@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import reactor.core.publisher.Flux +import java.io.ByteArrayInputStream /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({ describe("startSimulation") { it("should fail when empty input stream") { // given - val emptyInputStream = ByteInputStream() + val emptyInputStream = ByteArrayInputStream(byteArrayOf()) // when val result = cut.startSimulation(emptyInputStream) @@ -32,13 +32,13 @@ <parent> <groupId>org.onap.oparent</groupId> <artifactId>oparent</artifactId> - <version>1.2.0</version> + <version>1.2.1</version> <relativePath/> </parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <name>dcaegen2-collectors-veshv</name> <description>VES HighVolume Collector</description> <packaging>pom</packaging> @@ -343,7 +343,7 @@ <dependency> <groupId>${project.groupId}</groupId> <artifactId>hv-collector-analysis</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> </dependency> </dependencies> </plugin> @@ -586,7 +586,7 @@ <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <!-- remember to update netty native bindings versions --> - <version>Bismuth-SR11</version> + <version>Californium-SR2</version> <type>pom</type> <scope>import</scope> </dependency> @@ -674,21 +674,9 @@ <scope>test</scope> </dependency> <dependency> - <groupId>com.nhaarman</groupId> + <groupId>com.nhaarman.mockitokotlin2</groupId> <artifactId>mockito-kotlin</artifactId> - <version>1.5.0</version> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - <version>2.18.3</version> + <version>2.0.0</version> <scope>test</scope> </dependency> <dependency> diff --git a/version.properties b/version.properties index 967829d8..7b8b963a 100644 --- a/version.properties +++ b/version.properties @@ -1,5 +1,5 @@ -major=4 -minor=0 +major=1 +minor=1 patch=0 base_version=${major}.${minor}.${patch} release_version=${base_version} |