diff options
Diffstat (limited to 'hv-collector-core')
8 files changed, 90 insertions, 136 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml index ddfa3ed3..a8135292 100644 --- a/hv-collector-core/pom.xml +++ b/hv-collector-core/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -111,7 +111,7 @@ <artifactId>reactor-extra</artifactId> </dependency> <dependency> - <groupId>io.projectreactor.ipc</groupId> + <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> <dependency> @@ -127,36 +127,6 @@ <artifactId>javax.json</artifactId> <scope>runtime</scope> </dependency> - - - <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 07b5c82e..78afe9fd 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -23,7 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt index d08ad9e9..af4bbaa1 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt @@ -125,7 +125,7 @@ internal class ConsulConfigurationProvider(private val http: HttpAdapter, } companion object { - private const val MAX_RETRIES = 5 + private const val MAX_RETRIES = 5L private const val BACKOFF_INTERVAL_FACTOR = 30L private val logger = Logger(ConsulConfigurationProvider::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 4503955f..1672158e 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -19,10 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters +import io.netty.handler.codec.http.HttpStatusClass import org.slf4j.LoggerFactory import reactor.core.publisher.Mono -import reactor.core.publisher.toMono -import reactor.ipc.netty.http.client.HttpClient +import reactor.netty.http.client.HttpClient +import java.lang.IllegalStateException import java.nio.charset.Charset /** @@ -34,14 +35,18 @@ open class HttpAdapter(private val httpClient: HttpClient) { private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get(url + createQueryString(queryParams)) + .get() + .uri(url + createQueryString(queryParams)) + .responseSingle { response, content -> + if (response.status().codeClass() == HttpStatusClass.SUCCESS) + content.asString() + else + Mono.error(IllegalStateException("$url ${response.status().code()} ${response.status().reasonPhrase()}")) + } .doOnError { logger.error("Failed to get resource on path: $url (${it.localizedMessage})") logger.debug("Nested exception:", it) } - .flatMap { it.receiveContent().toMono() } - .map { it.content().toString(Charset.defaultCharset()) } - private fun createQueryString(params: Map<String, Any>): String { if (params.isEmpty()) @@ -57,7 +62,7 @@ open class HttpAdapter(private val httpClient: HttpClient) { } - return builder.removeSuffix("&").toString() + return builder.removeSuffix("&").toString() } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 7a47cfc3..e535300a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl.socket -import arrow.core.Option +import arrow.core.getOrElse import arrow.effects.IO -import io.netty.handler.ssl.SslContext import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -29,15 +28,13 @@ import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher import reactor.core.publisher.Mono -import reactor.ipc.netty.ByteBufFlux -import reactor.ipc.netty.NettyInbound -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.options.ServerOptions -import reactor.ipc.netty.tcp.TcpServer +import reactor.netty.ByteBufFlux +import reactor.netty.Connection +import reactor.netty.NettyInbound +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpServer import java.time.Duration -import java.util.function.BiFunction /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -48,63 +45,65 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private val collectorProvider: CollectorProvider) : Server { override fun start(): IO<ServerHandle> = IO { - val ctx = TcpServer.builder() - .options(this::configureServer) - .build() - .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { input, _ -> - handleConnection(input) - }) - NettyServerHandle(ctx) - } + val tcpServer = TcpServer.create() + .addressSupplier { serverConfig.serverListenAddress } + .configureSsl() + .handle(this::handleConnection) - private fun configureServer(opts: ServerOptions.Builder<*>) { - val sslContext: Option<SslContext> = sslContextFactory.createSslContext(serverConfig.securityConfiguration) - if (sslContext.isDefined()) opts.sslContext(sslContext.orNull()) - opts.port(serverConfig.listenPort) + NettyServerHandle(tcpServer.bindNow()) } - private fun handleConnection(nettyInbound: NettyInbound): Mono<Void> = + private fun TcpServer.configureSsl() = + sslContextFactory + .createSslContext(serverConfig.securityConfiguration) + .map { sslContext -> + this.secure { b -> b.sslContext(sslContext) } + }.getOrElse { this } + + private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = collectorProvider().fold( { - logger.warn { "Collector not ready. Closing connection from ${nettyInbound.remoteAddress()}..." } + nettyInbound.withConnection { conn -> + logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } + } Mono.empty() }, { - logger.info { "Handling connection from ${nettyInbound.remoteAddress()}" } - val allocator = nettyInbound.context().channel().alloc() - it.handleConnection(allocator, createDataStream(nettyInbound)) + nettyInbound.withConnection { conn -> + logger.info { "Handling connection from ${conn.address()}" } + conn.configureIdleTimeout(serverConfig.idleTimeout) + .logConnectionClosed() + } + it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) } ) - - fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() + private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound .receive() .retain() - private fun NettyInbound.configureIdleTimeout(timeout: Duration): NettyInbound { + private fun Connection.configureIdleTimeout(timeout: Duration): Connection { onReadIdle(timeout.toMillis()) { logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${remoteAddress()}..." + "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." } disconnectClient() } return this } - private fun NettyInbound.disconnectClient() { - context().channel().close().addListener { + private fun Connection.disconnectClient() { + channel().close().addListener { if (it.isSuccess) - logger.debug { "Channel (${remoteAddress()}) closed successfully." } + logger.debug { "Channel (${address()}) closed successfully." } else logger.warn("Channel close failed", it.cause()) } } - private fun NettyInbound.logConnectionClosed(): NettyInbound { - context().onClose { - logger.info("Connection from ${remoteAddress()} has been closed") + private fun Connection.logConnectionClosed(): Connection { + onTerminate().subscribe { + logger.info("Connection from ${address()} has been closed") } return this } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index 7a7d9342..85117684 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.model import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress import java.time.Duration /** @@ -27,10 +28,10 @@ import java.time.Duration * @since May 2018 */ data class ServerConfiguration( - val listenPort: Int, + val serverListenAddress: InetSocketAddress, val configurationProviderParams: ConfigurationProviderParams, val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, - val healthCheckApiPort: Int, + val healthCheckApiListenAddress: InetSocketAddress, val maximumPayloadSizeBytes: Int, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt index 9a6889c8..7a1a4cdc 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -19,9 +19,9 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import com.nhaarman.mockito_kotlin.eq -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt index 123d8f72..91457faf 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -19,21 +19,15 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.verify -import com.nhaarman.mockito_kotlin.whenever -import io.netty.buffer.Unpooled -import io.netty.handler.codec.http.HttpContent import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import reactor.ipc.netty.http.client.HttpClient -import reactor.ipc.netty.http.client.HttpClientResponse +import reactor.netty.http.client.HttpClient +import reactor.netty.http.server.HttpServer import reactor.test.StepVerifier -import java.nio.charset.Charset +import reactor.test.test /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -42,66 +36,51 @@ import java.nio.charset.Charset internal object HttpAdapterTest : Spek({ describe("HttpAdapter") { - val httpClientMock: HttpClient = mock() - val httpAdapter = HttpAdapter(httpClientMock) + val httpServer = HttpServer.create() + .host("127.0.0.1") + .route { routes -> + routes.get("/url") { req, resp -> + resp.sendString(Mono.just(req.uri())) + } + } + .bindNow() + val baseUrl = "http://${httpServer.host()}:${httpServer.port()}" + val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl)) + + afterGroup { + httpServer.disposeNow() + } given("url without query params") { - val initialUrl = "http://test-url" - whenever(httpClientMock.get(initialUrl)).thenReturn(Mono.empty()) + val url = "/url" it("should not append query string") { - httpAdapter.get(initialUrl) - verify(httpClientMock).get(initialUrl) + httpAdapter.get(url).test() + .expectNext(url) + .verifyComplete() } } given("url with query params") { - val queryParams = mapOf(Pair("key", "value")) - val initialUrl = "http://test-url" - val expectedUrl = "http://test-url?key=value" - whenever(httpClientMock.get(expectedUrl)).thenReturn(Mono.empty()) - - it("should parse them to query string and append to url") { - httpAdapter.get(initialUrl, queryParams) - verify(httpClientMock).get(expectedUrl) - } - } + val queryParams = mapOf(Pair("p", "the-value")) + val url = "/url" - given("valid resource url") { - val validUrl = "http://valid-url/" - val responseContent = """{"key1": "value1", "key2": "value2"}""" - val httpResponse = createHttpResponseMock(responseContent) - whenever(httpClientMock.get(validUrl)).thenReturn(Mono.just(httpResponse)) - - it("should return response string") { - StepVerifier - .create(httpAdapter.get(validUrl)) - .expectNext(responseContent) + it("should add them as query string to the url") { + httpAdapter.get(url, queryParams).test() + .expectNext("/url?p=the-value") + .verifyComplete() } } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - val exceptionMessage = "Test exception" - whenever(httpClientMock.get(invalidUrl)).thenReturn(Mono.error(Exception(exceptionMessage))) + given("invalid url") { + val invalidUrl = "/wtf" it("should interrupt the flux") { StepVerifier .create(httpAdapter.get(invalidUrl)) - .verifyErrorMessage(exceptionMessage) + .verifyError() } } } -}) - -fun createHttpResponseMock(content: String): HttpClientResponse { - val responseMock: HttpClientResponse = mock() - val contentMock: HttpContent = mock() - val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset()) - - whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock)) - whenever(contentMock.content()).thenReturn(contentByteBuff) - - return responseMock -} +})
\ No newline at end of file |