aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-07 15:08:43 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-15 12:51:43 +0100
commit4d15e5a578dc2c94af2b7f1c7ad02fb44d384501 (patch)
treebaad5e6314ef6d2a0f1409b0a23e0001e814f0a8 /hv-collector-core/src/main/kotlin
parent3fdd2fe2b4f35e18998d050c632fc6de24a7e3b1 (diff)
Update project and dependencies
* Changed version from 4.0.0-SNAPSHOT to 1.1.0-SNAPSHOT as per Vijay suggestion * Updated Reactor to BOM Californium-SR2 * Updated mockito-kotlin to 2.0.0 * Introduced some fixes to support OpenJDK 11 compilation Change-Id: Ib25979ef50c7241a019bf98efd9759e0b8792d58 Issue-ID: DCAEGEN2-961 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt19
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt75
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt5
5 files changed, 54 insertions, 49 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 07b5c82e..78afe9fd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d08ad9e9..af4bbaa1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
}
companion object {
- private const val MAX_RETRIES = 5
+ private const val MAX_RETRIES = 5L
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 4503955f..1672158e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -19,10 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import io.netty.handler.codec.http.HttpStatusClass
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
+import java.lang.IllegalStateException
import java.nio.charset.Charset
/**
@@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) {
private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get(url + createQueryString(queryParams))
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else
+ Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}"))
+ }
.doOnError {
logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
logger.debug("Nested exception:", it)
}
- .flatMap { it.receiveContent().toMono() }
- .map { it.content().toString(Charset.defaultCharset()) }
-
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
- return builder.removeSuffix("&").toString()
+ return builder.removeSuffix("&").toString()
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 7a47cfc3..e535300a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,9 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.Option
+import arrow.core.getOrElse
import arrow.effects.IO
-import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
-import reactor.ipc.netty.ByteBufFlux
-import reactor.ipc.netty.NettyInbound
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.TcpServer
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
import java.time.Duration
-import java.util.function.BiFunction
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val collectorProvider: CollectorProvider) : Server {
override fun start(): IO<ServerHandle> = IO {
- val ctx = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
- handleConnection(input)
- })
- NettyServerHandle(ctx)
- }
+ val tcpServer = TcpServer.create()
+ .addressSupplier { serverConfig.serverListenAddress }
+ .configureSsl()
+ .handle(this::handleConnection)
- private fun configureServer(opts: ServerOptions.Builder<*>) {
- val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration)
- if (sslContext.isDefined()) opts.sslContext(sslContext.orNull())
- opts.port(serverConfig.listenPort)
+ NettyServerHandle(tcpServer.bindNow())
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ private fun TcpServer.configureSsl() =
+ sslContextFactory
+ .createSslContext(serverConfig.securityConfiguration)
+ .map { sslContext ->
+ this.secure { b -> b.sslContext(sslContext) }
+ }.getOrElse { this }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
{
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ nettyInbound.withConnection { conn ->
+ logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+ }
Mono.empty()
},
{
- logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- val allocator = nettyInbound.context().channel().alloc()
- it.handleConnection(allocator, createDataStream(nettyInbound))
+ nettyInbound.withConnection { conn ->
+ logger.info { "Handling connection from ${conn.address()}" }
+ conn.configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ }
+ it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
}
)
-
- fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
+ private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
disconnectClient()
}
return this
}
- private fun NettyInbound.disconnectClient() {
- context().channel().close().addListener {
+ private fun Connection.disconnectClient() {
+ channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ logger.debug { "Channel (${address()}) closed successfully." }
else
logger.warn("Channel close failed", it.cause())
}
}
- private fun NettyInbound.logConnectionClosed(): NettyInbound {
- context().onClose {
- logger.info("Connection from ${remoteAddress()} has been closed")
+ private fun Connection.logConnectionClosed(): Connection {
+ onTerminate().subscribe {
+ logger.info("Connection from ${address()} has been closed")
}
return this
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 7a7d9342..85117684 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
import java.time.Duration
/**
@@ -27,10 +28,10 @@ import java.time.Duration
* @since May 2018
*/
data class ServerConfiguration(
- val listenPort: Int,
+ val serverListenAddress: InetSocketAddress,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
- val healthCheckApiPort: Int,
+ val healthCheckApiListenAddress: InetSocketAddress,
val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)