aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator/src/main
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-07 15:08:43 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-15 12:51:43 +0100
commit4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 (patch)
treebaad5e6314ef6d2a0f1409b0a23e0001e814f0a8 /hv-collector-xnf-simulator/src/main
parent3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (diff)
Update project and dependencies
* Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay suggestion * Updated Reactor to BOM Californium-SR2 * Updated mockito-kotlin to 2.0.0 * Introduced some fixes to support OpenJDK 11 compilation Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58 Issue-ID: DCAEGEN2-961 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-xnf-simulator/src/main')
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt30
1 files changed, 15 insertions, 15 deletions
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 7a280c10..8df416c9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,8 +35,8 @@ import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient
*/
class VesHvClient(private val configuration: SimulatorConfiguration) {
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security).orNull())
+ private val client: TcpClient = TcpClient.create()
+ .host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .secure { sslSpec ->
+ createSslContext(configuration.security).fold({}, sslSpec::sslContext)
}
- .build()
fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
@@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
- .newHandler { _, output -> handler(complete, messages, output) }
+ .handle { _, output -> handler(complete, messages, output) }
+ .connect()
.doOnError {
logger.info("Failed to connect to VesHvCollector on " +
"${configuration.vesHost}:${configuration.vesPort}")
@@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
ClientSslContextFactory().createSslContext(config)
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
+ private fun NettyOutbound.logConnectionClosed() =
+ withConnection { conn ->
+ conn.onTerminate().subscribe {
+ logger.info { "Connection to ${conn.address()} has been closed" }
+ }
+ }
companion object {
private val logger = Logger(VesHvClient::class)