aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt')
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt24
1 files changed, 11 insertions, 13 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 13256c52..78a72d93 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -32,17 +32,15 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
-import java.util.function.BiFunction
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class VesHvClient(configuration: ClientConfiguration) {
+class VesHvClient(private val configuration: ClientConfiguration) {
private val client: TcpClient = TcpClient.builder()
.options { opts ->
@@ -52,18 +50,17 @@ class VesHvClient(configuration: ClientConfiguration) {
}
.build()
- fun send(messages: Flux<WireFrame>) {
- client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
- }
+ fun send(messages: Flux<WireFrame>) =
+ client
+ .newHandler { _, out -> handler(out, messages) }
+ .doOnError{logger.info("Failed to connect to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}")}
+ .subscribe { logger.info("Connected to VesHvCollector on " +
+ "${configuration.vesHost}:${configuration.vesPort}") }
+
- private fun handler(nettyInbound: NettyInbound,
- nettyOutbound: NettyOutbound,
- messages: Flux<WireFrame>): Publisher<Void> {
+ private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> {
- nettyInbound
- .receive()
- .asString(Charsets.UTF_8)
- .subscribe { str -> logger.info("Server response: $str") }
val encoder = WireFrameEncoder(nettyOutbound.alloc())
@@ -74,6 +71,7 @@ class VesHvClient(configuration: ClientConfiguration) {
return nettyOutbound
.options { it.flushOnEach() }
.send(frames)
+ .then { logger.info("Messages have been sent") }
}
private fun createSslContext(config: SecurityConfiguration): SslContext =