summaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-07 15:08:43 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-15 12:51:43 +0100
commit4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 (patch)
treebaad5e6314ef6d2a0f1409b0a23e0001e814f0a8 /hv-collector-xnf-simulator/src
parent3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (diff)
Update project and dependencies
* Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay suggestion * Updated Reactor to BOM Californium-SR2 * Updated mockito-kotlin to 2.0.0 * Introduced some fixes to support OpenJDK 11 compilation Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58 Issue-ID: DCAEGEN2-961 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-xnf-simulator/src')
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt30
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt10
2 files changed, 20 insertions, 20 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 7a280c10..8df416c9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,8 +35,8 @@ import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient
*/
class VesHvClient(private val configuration: SimulatorConfiguration) {
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security).orNull())
+ private val client: TcpClient = TcpClient.create()
+ .host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .secure { sslSpec ->
+ createSslContext(configuration.security).fold({}, sslSpec::sslContext)
}
- .build()
fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
@@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
- .newHandler { _, output -> handler(complete, messages, output) }
+ .handle { _, output -> handler(complete, messages, output) }
+ .connect()
.doOnError {
logger.info("Failed to connect to VesHvCollector on " +
"${configuration.vesHost}:${configuration.vesPort}")
@@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
ClientSslContextFactory().createSslContext(config)
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
+ private fun NettyOutbound.logConnectionClosed() =
+ withConnection { conn ->
+ conn.onTerminate().subscribe {
+ logger.info { "Connection to ${conn.address()} has been closed" }
+ }
+ }
companion object {
private val logger = Logger(VesHvClient::class)
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 2a78ed5e..95510e77 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,10 +23,9 @@ import arrow.core.Left
import arrow.core.None
import arrow.core.Right
import arrow.effects.IO
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
-import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
@@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import reactor.core.publisher.Flux
+import java.io.ByteArrayInputStream
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({
describe("startSimulation") {
it("should fail when empty input stream") {
// given
- val emptyInputStream = ByteInputStream()
+ val emptyInputStream = ByteArrayInputStream(byteArrayOf())
// when
val result = cut.startSimulation(emptyInputStream)