diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-07 15:08:43 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-15 12:51:43 +0100 |
commit | 4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 (patch) | |
tree | baad5e6314ef6d2a0f1409b0a23e0001e814f0a8 /hv-collector-xnf-simulator/src | |
parent | 3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (diff) |
Update project and dependencies
* Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay
suggestion
* Updated Reactor to BOM Californium-SR2
* Updated mockito-kotlin to 2.0.0
* Introduced some fixes to support OpenJDK 11 compilation
Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58
Issue-ID: DCAEGEN2-961
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-xnf-simulator/src')
2 files changed, 20 insertions, 20 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 7a280c10..8df416c9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -35,8 +35,8 @@ import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpClient /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient */ class VesHvClient(private val configuration: SimulatorConfiguration) { - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security).orNull()) + private val client: TcpClient = TcpClient.create() + .host(configuration.vesHost) + .port(configuration.vesPort) + .secure { sslSpec -> + createSslContext(configuration.security).fold({}, sslSpec::sslContext) } - .build() fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() @@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client - .newHandler { _, output -> handler(complete, messages, output) } + .handle { _, output -> handler(complete, messages, output) } + .connect() .doOnError { logger.info("Failed to connect to VesHvCollector on " + "${configuration.vesHost}:${configuration.vesPort}") @@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = ClientSslContextFactory().createSslContext(config) - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } + private fun NettyOutbound.logConnectionClosed() = + withConnection { conn -> + conn.onTerminate().subscribe { + logger.info { "Connection to ${conn.address()} has been closed" } + } + } companion object { private val logger = Logger(VesHvClient::class) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 2a78ed5e..95510e77 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,10 +23,9 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import arrow.effects.IO -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever -import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it @@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import reactor.core.publisher.Flux +import java.io.ByteArrayInputStream /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({ describe("startSimulation") { it("should fail when empty input stream") { // given - val emptyInputStream = ByteInputStream() + val emptyInputStream = ByteArrayInputStream(byteArrayOf()) // when val result = cut.startSimulation(emptyInputStream) |