aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt17
1 files changed, 6 insertions, 11 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index ca6d169a..8de7da32 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,6 +35,7 @@ import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
import reactor.netty.NettyOutbound
import reactor.netty.tcp.TcpClient
+import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -43,8 +44,7 @@ import reactor.netty.tcp.TcpClient
class VesHvClient(private val configuration: SimulatorConfiguration) {
private val client: TcpClient = TcpClient.create()
- .host(configuration.vesHost)
- .port(configuration.vesPort)
+ .addressSupplier { configuration.hvVesAddress }
.configureSsl()
private fun TcpClient.configureSsl() =
@@ -61,14 +61,10 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
.handle { _, output -> handler(complete, messages, output) }
.connect()
.doOnError {
- logger.info {
- "Failed to connect to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
- }
+ logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
}
.subscribe {
- logger.info {
- "Connected to VesHvCollector on ${configuration.vesHost}:${configuration.vesPort}"
- }
+ logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
}
return complete.then()
}
@@ -81,7 +77,7 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
val encoder = WireFrameEncoder(allocator)
val frames = messages
.map(encoder::encode)
- .window(MAX_BATCH_SIZE)
+ .window(XS_BUFFER_SIZE)
return nettyOutbound
.logConnectionClosed()
@@ -99,13 +95,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun NettyOutbound.logConnectionClosed() =
withConnection { conn ->
- conn.onTerminate().subscribe {
+ conn.onDispose {
logger.info { "Connection to ${conn.address()} has been closed" }
}
}
companion object {
private val logger = Logger(VesHvClient::class)
- private const val MAX_BATCH_SIZE = 128
}
}