From 4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Wed, 7 Nov 2018 15:08:43 +0100 Subject: Update project and dependencies * Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay suggestion * Updated Reactor to BOM Californium-SR2 * Updated mockito-kotlin to 2.0.0 * Introduced some fixes to support OpenJDK 11 compilation Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58 Issue-ID: DCAEGEN2-961 Signed-off-by: Piotr Jaszczyk --- hv-collector-xnf-simulator/pom.xml | 30 +--------------------- .../simulators/xnf/impl/adapters/VesHvClient.kt | 30 +++++++++++----------- .../dcae/collectors/veshv/main/XnfSimulatorTest.kt | 10 ++++---- 3 files changed, 21 insertions(+), 49 deletions(-) (limited to 'hv-collector-xnf-simulator') diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml index 85ef0907..6526915a 100644 --- a/hv-collector-xnf-simulator/pom.xml +++ b/hv-collector-xnf-simulator/pom.xml @@ -33,7 +33,7 @@ org.onap.dcaegen2.collectors.hv-ves ves-hv-collector - 4.0.0-SNAPSHOT + 1.1.0-SNAPSHOT .. @@ -138,34 +138,6 @@ ${os.detected.classifier} --> - - com.nhaarman - mockito-kotlin - - - org.mockito - mockito-core - - - org.assertj - assertj-core - - - org.jetbrains.kotlin - kotlin-test - - - org.jetbrains.spek - spek-api - - - org.jetbrains.spek - spek-junit-platform-engine - - - io.projectreactor - reactor-test - ch.qos.logback logback-classic diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 7a280c10..8df416c9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -35,8 +35,8 @@ import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpClient /** * @author Jakub Dudycz @@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient */ class VesHvClient(private val configuration: SimulatorConfiguration) { - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security).orNull()) + private val client: TcpClient = TcpClient.create() + .host(configuration.vesHost) + .port(configuration.vesPort) + .secure { sslSpec -> + createSslContext(configuration.security).fold({}, sslSpec::sslContext) } - .build() fun sendIo(messages: Flux) = sendRx(messages).then(Mono.just(Unit)).asIo() @@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun sendRx(messages: Flux): Mono { val complete = ReplayProcessor.create(1) client - .newHandler { _, output -> handler(complete, messages, output) } + .handle { _, output -> handler(complete, messages, output) } + .connect() .doOnError { logger.info("Failed to connect to VesHvCollector on " + "${configuration.vesHost}:${configuration.vesPort}") @@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun createSslContext(config: SecurityConfiguration): Option = ClientSslContextFactory().createSslContext(config) - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } + private fun NettyOutbound.logConnectionClosed() = + withConnection { conn -> + conn.onTerminate().subscribe { + logger.info { "Connection to ${conn.address()} has been closed" } + } + } companion object { private val logger = Logger(VesHvClient::class) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 2a78ed5e..95510e77 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,10 +23,9 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import arrow.effects.IO -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever -import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it @@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import reactor.core.publisher.Flux +import java.io.ByteArrayInputStream /** * @author Piotr Jaszczyk @@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({ describe("startSimulation") { it("should fail when empty input stream") { // given - val emptyInputStream = ByteInputStream() + val emptyInputStream = ByteArrayInputStream(byteArrayOf()) // when val result = cut.startSimulation(emptyInputStream) -- cgit 1.2.3-korg