summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml34
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt19
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt75
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt5
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt6
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt83
8 files changed, 90 insertions, 136 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
index ddfa3ed3..a8135292 100644
--- a/hv-collector-core/pom.xml
+++ b/hv-collector-core/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -111,7 +111,7 @@
<artifactId>reactor-extra</artifactId>
</dependency>
<dependency>
- <groupId>io.projectreactor.ipc</groupId>
+ <groupId>io.projectreactor.netty</groupId>
<artifactId>reactor-netty</artifactId>
</dependency>
<dependency>
@@ -127,36 +127,6 @@
<artifactId>javax.json</artifactId>
<scope>runtime</scope>
</dependency>
-
-
- <dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 07b5c82e..78afe9fd 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
index d08ad9e9..af4bbaa1 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter,
}
companion object {
- private const val MAX_RETRIES = 5
+ private const val MAX_RETRIES = 5L
private const val BACKOFF_INTERVAL_FACTOR = 30L
private val logger = Logger(ConsulConfigurationProvider::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
index 4503955f..1672158e 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -19,10 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
+import io.netty.handler.codec.http.HttpStatusClass
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
-import reactor.core.publisher.toMono
-import reactor.ipc.netty.http.client.HttpClient
+import reactor.netty.http.client.HttpClient
+import java.lang.IllegalStateException
import java.nio.charset.Charset
/**
@@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) {
private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get(url + createQueryString(queryParams))
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else
+ Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}"))
+ }
.doOnError {
logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
logger.debug("Nested exception:", it)
}
- .flatMap { it.receiveContent().toMono() }
- .map { it.content().toString(Charset.defaultCharset()) }
-
private fun createQueryString(params: Map<String, Any>): String {
if (params.isEmpty())
@@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) {
}
- return builder.removeSuffix("&").toString()
+ return builder.removeSuffix("&").toString()
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 7a47cfc3..e535300a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,9 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
-import arrow.core.Option
+import arrow.core.getOrElse
import arrow.effects.IO
-import io.netty.handler.ssl.SslContext
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
import reactor.core.publisher.Mono
-import reactor.ipc.netty.ByteBufFlux
-import reactor.ipc.netty.NettyInbound
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.options.ServerOptions
-import reactor.ipc.netty.tcp.TcpServer
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
import java.time.Duration
-import java.util.function.BiFunction
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private val collectorProvider: CollectorProvider) : Server {
override fun start(): IO<ServerHandle> = IO {
- val ctx = TcpServer.builder()
- .options(this::configureServer)
- .build()
- .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ ->
- handleConnection(input)
- })
- NettyServerHandle(ctx)
- }
+ val tcpServer = TcpServer.create()
+ .addressSupplier { serverConfig.serverListenAddress }
+ .configureSsl()
+ .handle(this::handleConnection)
- private fun configureServer(opts: ServerOptions.Builder<*>) {
- val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration)
- if (sslContext.isDefined()) opts.sslContext(sslContext.orNull())
- opts.port(serverConfig.listenPort)
+ NettyServerHandle(tcpServer.bindNow())
}
- private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> =
+ private fun TcpServer.configureSsl() =
+ sslContextFactory
+ .createSslContext(serverConfig.securityConfiguration)
+ .map { sslContext ->
+ this.secure { b -> b.sslContext(sslContext) }
+ }.getOrElse { this }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
collectorProvider().fold(
{
- logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." }
+ nettyInbound.withConnection { conn ->
+ logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+ }
Mono.empty()
},
{
- logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" }
- val allocator = nettyInbound.context().channel().alloc()
- it.handleConnection(allocator, createDataStream(nettyInbound))
+ nettyInbound.withConnection { conn ->
+ logger.info { "Handling connection from ${conn.address()}" }
+ conn.configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ }
+ it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
}
)
-
- fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
+ private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
.receive()
.retain()
- private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound {
+ private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
onReadIdle(timeout.toMillis()) {
logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..."
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
}
disconnectClient()
}
return this
}
- private fun NettyInbound.disconnectClient() {
- context().channel().close().addListener {
+ private fun Connection.disconnectClient() {
+ channel().close().addListener {
if (it.isSuccess)
- logger.debug { "Channel (${remoteAddress()}) closed successfully." }
+ logger.debug { "Channel (${address()}) closed successfully." }
else
logger.warn("Channel close failed", it.cause())
}
}
- private fun NettyInbound.logConnectionClosed(): NettyInbound {
- context().onClose {
- logger.info("Connection from ${remoteAddress()} has been closed")
+ private fun Connection.logConnectionClosed(): Connection {
+ onTerminate().subscribe {
+ logger.info("Connection from ${address()} has been closed")
}
return this
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index 7a7d9342..85117684 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.model
import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
import java.time.Duration
/**
@@ -27,10 +28,10 @@ import java.time.Duration
* @since May 2018
*/
data class ServerConfiguration(
- val listenPort: Int,
+ val serverListenAddress: InetSocketAddress,
val configurationProviderParams: ConfigurationProviderParams,
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
- val healthCheckApiPort: Int,
+ val healthCheckApiListenAddress: InetSocketAddress,
val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
index 9a6889c8..7a1a4cdc 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -19,9 +19,9 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import com.nhaarman.mockito_kotlin.eq
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
index 123d8f72..91457faf 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -19,21 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.verify
-import com.nhaarman.mockito_kotlin.whenever
-import io.netty.buffer.Unpooled
-import io.netty.handler.codec.http.HttpContent
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import reactor.ipc.netty.http.client.HttpClient
-import reactor.ipc.netty.http.client.HttpClientResponse
+import reactor.netty.http.client.HttpClient
+import reactor.netty.http.server.HttpServer
import reactor.test.StepVerifier
-import java.nio.charset.Charset
+import reactor.test.test
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -42,66 +36,51 @@ import java.nio.charset.Charset
internal object HttpAdapterTest : Spek({
describe("HttpAdapter") {
- val httpClientMock: HttpClient = mock()
- val httpAdapter = HttpAdapter(httpClientMock)
+ val httpServer = HttpServer.create()
+ .host("127.0.0.1")
+ .route { routes ->
+ routes.get("/url") { req, resp ->
+ resp.sendString(Mono.just(req.uri()))
+ }
+ }
+ .bindNow()
+ val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
+ val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
+
+ afterGroup {
+ httpServer.disposeNow()
+ }
given("url without query params") {
- val initialUrl = "http://test-url"
- whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty())
+ val url = "/url"
it("should not append query string") {
- httpAdapter.get(initialUrl)
- verify(httpClientMock).get(initialUrl)
+ httpAdapter.get(url).test()
+ .expectNext(url)
+ .verifyComplete()
}
}
given("url with query params") {
- val queryParams = mapOf(Pair("key", "value"))
- val initialUrl = "http://test-url"
- val expectedUrl = "http://test-url?key=value"
- whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty())
-
- it("should parse them to query string and append to url") {
- httpAdapter.get(initialUrl, queryParams)
- verify(httpClientMock).get(expectedUrl)
- }
- }
+ val queryParams = mapOf(Pair("p", "the-value"))
+ val url = "/url"
- given("valid resource url") {
- val validUrl = "http://valid-url/"
- val responseContent = """{"key1": "value1", "key2": "value2"}"""
- val httpResponse = createHttpResponseMock(responseContent)
- whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse))
-
- it("should return response string") {
- StepVerifier
- .create(httpAdapter.get(validUrl))
- .expectNext(responseContent)
+ it("should add them as query string to the url") {
+ httpAdapter.get(url, queryParams).test()
+ .expectNext("/url?p=the-value")
+ .verifyComplete()
}
}
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
- val exceptionMessage = "Test exception"
- whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage)))
+ given("invalid url") {
+ val invalidUrl = "/wtf"
it("should interrupt the flux") {
StepVerifier
.create(httpAdapter.get(invalidUrl))
- .verifyErrorMessage(exceptionMessage)
+ .verifyError()
}
}
}
-})
-
-fun createHttpResponseMock(content: String): HttpClientResponse {
- val responseMock: HttpClientResponse = mock()
- val contentMock: HttpContent = mock()
- val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset())
-
- whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock))
- whenever(contentMock.content()).thenReturn(contentByteBuff)
-
- return responseMock
-}
+}) \ No newline at end of file