diff options
Diffstat (limited to 'hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt')
-rw-r--r-- | hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt | 24 |
1 files changed, 11 insertions, 13 deletions
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 13256c52..78a72d93 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -32,17 +32,15 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient -import java.util.function.BiFunction /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class VesHvClient(configuration: ClientConfiguration) { +class VesHvClient(private val configuration: ClientConfiguration) { private val client: TcpClient = TcpClient.builder() .options { opts -> @@ -52,18 +50,17 @@ class VesHvClient(configuration: ClientConfiguration) { } .build() - fun send(messages: Flux<WireFrame>) { - client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) }) - } + fun send(messages: Flux<WireFrame>) = + client + .newHandler { _, out -> handler(out, messages) } + .doOnError{logger.info("Failed to connect to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}")} + .subscribe { logger.info("Connected to VesHvCollector on " + + "${configuration.vesHost}:${configuration.vesPort}") } + - private fun handler(nettyInbound: NettyInbound, - nettyOutbound: NettyOutbound, - messages: Flux<WireFrame>): Publisher<Void> { + private fun handler(nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> { - nettyInbound - .receive() - .asString(Charsets.UTF_8) - .subscribe { str -> logger.info("Server response: $str") } val encoder = WireFrameEncoder(nettyOutbound.alloc()) @@ -74,6 +71,7 @@ class VesHvClient(configuration: ClientConfiguration) { return nettyOutbound .options { it.flushOnEach() } .send(frames) + .then { logger.info("Messages have been sent") } } private fun createSslContext(config: SecurityConfiguration): SslContext = |