aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt27
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt3
-rw-r--r--pom.xml17
5 files changed, 30 insertions, 23 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 6c256b72..3c85a9b1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.boundary
+import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
@@ -30,7 +31,7 @@ interface Collector {
fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
-typealias CollectorProvider = () -> Collector
+typealias CollectorProvider = () -> Option<Collector>
interface Server {
fun start(): IO<ServerHandle>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index a400ff32..d807a9e7 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -32,6 +32,7 @@ import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.util.concurrent.atomic.AtomicReference
@@ -57,7 +58,7 @@ class CollectorFactory(val configuration: ConfigurationProvider,
healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
}
.subscribe(collector::set)
- return collector::get
+ return collector::getOption
}
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index f858d959..a34be7cd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -70,23 +70,34 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
.receive()
.retain()
- return collectorProvider()
- .handleConnection(nettyInbound.context().channel().alloc(), dataStream)
+ return collectorProvider().fold(
+ {
+ logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ Mono.empty()
+ },
+ { it.handleConnection(nettyInbound.context().channel().alloc(), dataStream) })
+
}
private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
onReadIdle(timeout.toMillis()) {
- logger.info { "Idle timeout of ${timeout.seconds} s reached. Disconnecting..." }
- context().channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Client disconnected because of idle timeout" }
- else
- logger.warn("Channel close failed", it.cause())
+ logger.info {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
}
+ disconnectClient()
}
return this
}
+ private fun NettyInbound.disconnectClient() {
+ context().channel().close().addListener {
+ if (it.isSuccess)
+ logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+
private fun NettyInbound.logConnectionClosed(): NettyInbound {
context().onClose {
logger.info("Connection from ${remoteAddress()} has been closed")
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index e9b70578..942e6edf 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import arrow.core.getOrElse
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.UnpooledByteBufAllocator
@@ -48,7 +49,7 @@ class Sut(sink: Sink = StoringSink()) {
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
- get() = collectorProvider()
+ get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/pom.xml b/pom.xml
index ebb3afc4..23fb7a27 100644
--- a/pom.xml
+++ b/pom.xml
@@ -18,8 +18,7 @@
~ limitations under the License.
~ ============LICENSE_END=========================================================
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
@@ -33,7 +32,7 @@
<parent>
<groupId>org.onap.oparent</groupId>
<artifactId>oparent</artifactId>
- <version>0.1.1</version>
+ <version>1.2.0</version>
<relativePath/>
</parent>
@@ -89,7 +88,7 @@
<docker-image.registry>${onap.nexus.dockerregistry.daily}</docker-image.registry>
<docker-image.namespace>onap</docker-image.namespace>
<docker-image.name>${project.groupId}.${project.artifactId}</docker-image.name>
- <docker.http_proxy></docker.http_proxy>
+ <docker.http_proxy/>
</properties>
@@ -316,11 +315,7 @@
</goals>
<configuration>
<target name="detekt" unless="${skipAnalysis}">
- <java taskname="detekt" dir="${basedir}"
- fork="true"
- failonerror="true"
- classname="io.gitlab.arturbosch.detekt.cli.Main"
- classpathref="maven.plugin.classpath">
+ <java taskname="detekt" dir="${basedir}" fork="true" failonerror="true" classname="io.gitlab.arturbosch.detekt.cli.Main" classpathref="maven.plugin.classpath">
<arg value="--input"/>
<arg value="${basedir}/src/main/kotlin"/>
<arg value="--config-resource"/>
@@ -514,7 +509,7 @@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>25.0-jre</version>
+
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
@@ -690,5 +685,3 @@
</dependencies>
</dependencyManagement>
</project>
-
-