summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt32
3 files changed, 21 insertions, 18 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index f9f52b4e..f37c823a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,4 +1,4 @@
-version: "3.4"
+version: "3.5"
services:
zookeeper:
image: wurstmeister/zookeeper
@@ -35,6 +35,8 @@ services:
ports:
- "6060:6060"
- "6061:6061/tcp"
+ entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
+ "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
"--config-url", "http://consul:8500/v1/kv/veshv-config"]
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 52689162..f608a2b9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink
@@ -58,7 +57,7 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
- .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .doFinally { releaseBuffersMemory(wireDecoder) }
.onErrorResume(::handleErrors)
.then()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index a34be7cd..b4ad4b7d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
+import reactor.ipc.netty.ByteBufFlux
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
@@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.port(serverConfig.listenPort)
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
- logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
+ it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ }
+ )
- val dataStream = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- .receive()
- .retain()
- return collectorProvider().fold(
- {
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
- Mono.empty()
- },
- { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
-
- }
+ fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {