aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docker-compose.yml4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt32
-rw-r--r--hv-collector-main/src/main/resources/logback.xml68
-rw-r--r--pom.xml2
5 files changed, 61 insertions, 48 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index f9f52b4e..f37c823a 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -1,4 +1,4 @@
-version: "3.4"
+version: "3.5"
services:
zookeeper:
image: wurstmeister/zookeeper
@@ -35,6 +35,8 @@ services:
ports:
- "6060:6060"
- "6061:6061/tcp"
+ entrypoint: ["java", "-Dio.netty.leakDetection.level=paranoid",
+ "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
command: ["--listen-port", "6061",
"--health-check-api-port", "6060",
"--config-url", "http://consul:8500/v1/kv/veshv-config"]
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 52689162..f608a2b9 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -33,7 +33,6 @@ import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.SynchronousSink
@@ -58,7 +57,7 @@ internal class VesHvCollector(
.transform(::decodePayload)
.filter(VesMessage::isValid)
.transform(::routeMessage)
- .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .doFinally { releaseBuffersMemory(wireDecoder) }
.onErrorResume(::handleErrors)
.then()
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index a34be7cd..b4ad4b7d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -30,6 +30,7 @@ import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
+import reactor.ipc.netty.ByteBufFlux
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.options.ServerOptions
@@ -61,23 +62,24 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
opts.port(serverConfig.listenPort)
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> {
- logger.info("Handling connection from ${nettyInbound.remoteAddress()}")
+ private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ {
+ logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
+ it.handleConnection(nettyInbound.context().channel().alloc(), createDataStream(nettyInbound))
+ }
+ )
- val dataStream = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- .receive()
- .retain()
- return collectorProvider().fold(
- {
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
- Mono.empty()
- },
- { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
-
- }
+ fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+ .configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ .receive()
+ .retain()
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
index a0235e17..c76ff21a 100644
--- a/hv-collector-main/src/main/resources/logback.xml
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -1,39 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
- <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+ <property name="COMPONENT_NAME"
+ value="dcae-hv-ves-collector"/>
+ <property name="COMPONENT_SHORT_NAME"
+ value="hv-ves"/>
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>
- %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
- </pattern>
- </encoder>
- </appender>
+ <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
+ <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
+ <property name="ARCHIVE" value="${LOG_PATH}/archive"/>
+ <property name="FILE_LOG_PATTERN" value="
+%nopexception%50.50logger
+| %date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
+| %highlight(%-5level)
+| %msg
+| %rootException
+| %thread%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ </appender>
<appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
- </encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
- <maxFileSize>50MB</maxFileSize>
- <maxHistory>30</maxHistory>
- <totalSizeCap>10GB</totalSizeCap>
- </rollingPolicy>
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_PATH}/${LOG_FILENAME}.log</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedRollingPolicy">
+ <FileNamePattern>${ARCHIVE}/${LOG_FILENAME}.%d{yyyy-MM-dd}.%i.log.gz</FileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
- <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
- <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.wire" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSink" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv.impl.adapters.LoggingSinkProvider" level="DEBUG"/>
+ <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
</root>
</configuration> \ No newline at end of file
diff --git a/pom.xml b/pom.xml
index ee0558a2..a71f942e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,7 +59,7 @@
</modules>
<properties>
- <kotlin.version>1.2.61</kotlin.version>
+ <kotlin.version>1.2.70</kotlin.version>
<arrow.version>0.7.3</arrow.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<build-helper-maven-plugin.version>1.7</build-helper-maven-plugin.version>