diff options
5 files changed, 61 insertions, 48 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index f9f52b4e..f37c823a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,4 @@ -version: "3.4" +version: "3.5" services: zookeeper: image: wurstmeister/zookeeper @@ -35,6 +35,8 @@ services: ports: - "6060:6060" - "6061:6061/tcp" + entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid", + "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] command: ["--listen-port", "6061", "--health-check-api-port", "6060", "--config-url", "http://consul:8500/v1/kv/veshv-config"] diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 52689162..f608a2b9 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.SynchronousSink @@ -58,7 +57,7 @@ internal class VesHvCollector( .transform(::decodePayload) .filter(VesMessage::isValid) .transform(::routeMessage) - .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .doFinally { releaseBuffersMemory(wireDecoder) } .onErrorResume(::handleErrors) .then() } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index a34be7cd..b4ad4b7d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Mono +import reactor.ipc.netty.ByteBufFlux import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.options.ServerOptions @@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, opts.port(serverConfig.listenPort) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> { - logger.info("Handling connection from ${nettyInbound.remoteAddress()}") + private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + collectorProvider().fold( + { + logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + Mono.empty() + }, + { + logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } + it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound)) + } + ) - val dataStream = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - .receive() - .retain() - return collectorProvider().fold( - { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } - Mono.empty() - }, - { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) }) - - } + fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound + .configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + .receive() + .retain() private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { onReadIdle(timeout.toMillis()) { diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml index a0235e17..c76ff21a 100644 --- a/hv-collector-main/src/main/resources/logback.xml +++ b/hv-collector-main/src/main/resources/logback.xml @@ -1,39 +1,49 @@ <?xml version="1.0" encoding="UTF-8"?> <configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + <property name="COMPONENT_NAME" + value="dcae-hv-ves-collector"/> + <property name="COMPONENT_SHORT_NAME" + value="hv-ves"/> - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> + <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> + <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> + <property name="ARCHIVE" value="${LOG_PATH}/archive"/> + <property name="FILE_LOG_PATTERN" value=" +%nopexception%50.50logger +| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} +| %highlight(%-5level) +| %msg +| %rootException +| %thread%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + </appender> <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_PATH}/${LOG_FILENAME}.log</file> + <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy"> + <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/> - <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/> - <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> + <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/> + <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> </root> </configuration>
\ No newline at end of file @@ -59,7 +59,7 @@ </modules> <properties> - <kotlin.version>1.2.61</kotlin.version> + <kotlin.version>1.2.70</kotlin.version> <arrow.version>0.7.3</arrow.version> <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version> <build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version> |