diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-28 15:46:50 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-29 14:41:42 +0100 |
commit | dde383a2aa75f94c26d7949665b79cc95486a223 (patch) | |
tree | 75f3e8f564067afd0e67dbe6254183e45ca26944 /sources/hv-collector-core/src/test/kotlin/org/onap | |
parent | 77f896523f2065b1da1be21545155a29edea5122 (diff) |
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg.
concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}`
Unfortunately to avoid defining dependencies in many places and having
circural dependencies it was necessarry to reorganize the maven module
structure. The goal was to have `sources` module with production code and
`build` module with build-time tooling (detekt rules among them).
Issue-ID: DCAEGEN2-1002
Change-Id: I36e677b98972aaae6905d722597cbce5e863d201
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap')
6 files changed, 791 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt new file mode 100644 index 00000000..3090042d --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -0,0 +1,128 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import org.onap.ves.VesEventOuterClass.CommonEventHeader.* + +internal object MessageValidatorTest : Spek({ + + given("Message validator") { + val cut = MessageValidator + + on("ves hv message including header with fully initialized fields") { + val commonHeader = commonHeader() + + it("should accept message with fully initialized message header") { + val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() + } + + VesEventDomain.values() + .forEach { domain -> + it("should accept message with $domain domain") { + val header = commonHeader(domain) + val vesMessage = VesMessage(header, vesEventBytes(header)) + assertThat(cut.isValid(vesMessage)) + .isTrue() + } + } + } + + on("ves hv message bytes") { + val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) + it("should not accept message with default header") { + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + + val priorityTestCases = mapOf( + Priority.PRIORITY_NOT_PROVIDED to false, + Priority.HIGH to true + ) + + priorityTestCases.forEach { value, expectedResult -> + on("ves hv message including header with priority $value") { + val commonEventHeader = commonHeader(priority = value) + val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) + + it("should resolve validation result") { + assertThat(cut.isValid(vesMessage)).describedAs("message validation results") + .isEqualTo(expectedResult) + } + } + } + + on("ves hv message including header with not initialized fields") { + val commonHeader = newBuilder() + .setVersion("1.9") + .setEventName("Sample event name") + .setEventId("Sample event Id") + .setSourceName("Sample Source") + .build() + val rawMessageBytes = vesEventBytes(commonHeader) + + it("should not accept not fully initialized message header") { + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + + on("ves hv message including header.vesEventListenerVersion with non-string major part") { + val commonHeader = commonHeader(vesEventListenerVersion = "sample-version") + val rawMessageBytes = vesEventBytes(commonHeader) + + + it("should not accept message header") { + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + + on("ves hv message including header.vesEventListenerVersion with major part != 7") { + val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3") + val rawMessageBytes = vesEventBytes(commonHeader) + + it("should not accept message header") { + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + + on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") { + val commonHeader = commonHeader(vesEventListenerVersion = "7.test") + val rawMessageBytes = vesEventBytes(commonHeader) + + it("should not accept message header") { + val vesMessage = VesMessage(commonHeader, rawMessageBytes) + assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() + } + } + } +}) diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt new file mode 100644 index 00000000..e8a31231 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -0,0 +1,112 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import arrow.core.None +import arrow.core.Some +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.model.routing +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader + + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +object RouterTest : Spek({ + given("sample configuration") { + val config = routing { + + defineRoute { + fromDomain(PERF3GPP.domainName) + toTopic("ves_rtpm") + withFixedPartitioning(2) + } + + defineRoute { + fromDomain(SYSLOG.domainName) + toTopic("ves_trace") + withFixedPartitioning() + } + }.build() + val cut = Router(config) + + on("message with existing route (rtpm)") { + val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) + val result = cut.findDestination(message) + + it("should have route available") { + assertThat(result).isNotNull() + } + + it("should be routed to proper partition") { + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) + } + + it("should be routed to proper topic") { + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) + } + + it("should be routed with a given message") { + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + } + } + + on("message with existing route (trace)") { + val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) + val result = cut.findDestination(message) + + it("should have route available") { + assertThat(result).isNotNull() + } + + it("should be routed to proper partition") { + assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) + } + + it("should be routed to proper topic") { + assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) + } + + it("should be routed with a given message") { + assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) + } + } + + on("message with unknown route") { + val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) + val result = cut.findDestination(message) + + it("should not have route available") { + assertThat(result).isEqualTo(None) + } + } + } +})
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt new file mode 100644 index 00000000..8950a557 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl + +import arrow.core.Option +import com.google.protobuf.ByteString +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader +import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes +import java.nio.charset.Charset +import kotlin.test.assertTrue +import kotlin.test.fail + + +internal object VesDecoderTest : Spek({ + + given("ves message decoder") { + val cut = VesDecoder() + + on("ves hv message bytes") { + val commonHeader = commonHeader(HEARTBEAT) + val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) + + it("should decode only header and pass it on along with raw message") { + val expectedMessage = VesMessage( + commonHeader, + rawMessageBytes + ) + + assertTrue { + cut.decode(rawMessageBytes).exists { + it == expectedMessage + } + } + } + } + + on("invalid ves hv message bytes") { + val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) + + it("should throw error") { + assertFailedWithError(cut.decode(rawMessageBytes)) + } + } + } +}) + +private fun <A> assertFailedWithError(option: Option<A>) = + option.exists { + fail("Error expected") + } diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt new file mode 100644 index 00000000..c6364f74 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt @@ -0,0 +1,157 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.mockito.Mockito +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT +import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState + +import reactor.core.publisher.Mono +import reactor.retry.Retry +import reactor.test.StepVerifier +import java.time.Duration +import java.util.* +import kotlin.test.assertEquals + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal object ConsulConfigurationProviderTest : Spek({ + + describe("Consul configuration provider") { + + val httpAdapterMock: HttpAdapter = mock() + val healthStateProvider = HealthState.INSTANCE + + given("valid resource url") { + val validUrl = "http://valid-url/" + val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) + + on("call to consul") { + whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) + .thenReturn(Mono.just(constructConsulResponse())) + + it("should use received configuration") { + + StepVerifier.create(consulConfigProvider().take(1)) + .consumeNextWith { + + assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) + + val route1 = it.routing.routes[0] + assertThat(FAULT.domainName) + .describedAs("routed domain 1") + .isEqualTo(route1.domain) + assertThat("test-topic-1") + .describedAs("target topic 1") + .isEqualTo(route1.targetTopic) + + val route2 = it.routing.routes[1] + assertThat(HEARTBEAT.domainName) + .describedAs("routed domain 2") + .isEqualTo(route2.domain) + assertThat("test-topic-2") + .describedAs("target topic 2") + .isEqualTo(route2.targetTopic) + + }.verifyComplete() + } + } + + } + given("invalid resource url") { + val invalidUrl = "http://invalid-url/" + + val iterationCount = 3L + val consulConfigProvider = constructConsulConfigProvider( + invalidUrl, httpAdapterMock, healthStateProvider, iterationCount + ) + + on("call to consul") { + whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) + .thenReturn(Mono.error(RuntimeException("Test exception"))) + + it("should interrupt the flux") { + + StepVerifier.create(consulConfigProvider()) + .verifyErrorMessage("Test exception") + } + + it("should update the health state") { + StepVerifier.create(healthStateProvider().take(iterationCount)) + .expectNextCount(iterationCount - 1) + .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) + .verifyComplete() + } + } + } + } + +}) + +private fun constructConsulConfigProvider(url: String, + httpAdapter: HttpAdapter, + healthState: HealthState, + iterationCount: Long = 1 +): ConsulConfigurationProvider { + + val firstRequestDelay = Duration.ofMillis(1) + val requestInterval = Duration.ofMillis(1) + val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) + + return ConsulConfigurationProvider( + httpAdapter, + url, + firstRequestDelay, + requestInterval, + healthState, + retry + ) +} + + +const val kafkaAddress = "message-router-kafka" + +fun constructConsulResponse(): String = + """{ + "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", + "collector.routing": [ + { + "fromDomain": "fault", + "toTopic": "test-topic-1" + }, + { + "fromDomain": "heartbeat", + "toTopic": "test-topic-2" + } + ] + }""" diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt new file mode 100644 index 00000000..91457faf --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt @@ -0,0 +1,86 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import reactor.core.publisher.Mono +import reactor.netty.http.client.HttpClient +import reactor.netty.http.server.HttpServer +import reactor.test.StepVerifier +import reactor.test.test + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal object HttpAdapterTest : Spek({ + describe("HttpAdapter") { + + val httpServer = HttpServer.create() + .host("127.0.0.1") + .route { routes -> + routes.get("/url") { req, resp -> + resp.sendString(Mono.just(req.uri())) + } + } + .bindNow() + val baseUrl = "http://${httpServer.host()}:${httpServer.port()}" + val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl)) + + afterGroup { + httpServer.disposeNow() + } + + given("url without query params") { + val url = "/url" + + it("should not append query string") { + httpAdapter.get(url).test() + .expectNext(url) + .verifyComplete() + } + } + + given("url with query params") { + val queryParams = mapOf(Pair("p", "the-value")) + val url = "/url" + + it("should add them as query string to the url") { + httpAdapter.get(url, queryParams).test() + .expectNext("/url?p=the-value") + .verifyComplete() + } + } + + given("invalid url") { + val invalidUrl = "/wtf" + + it("should interrupt the flux") { + StepVerifier + .create(httpAdapter.get(invalidUrl)) + .verifyError() + } + } + } + +})
\ No newline at end of file diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt new file mode 100644 index 00000000..f06a0dc7 --- /dev/null +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -0,0 +1,234 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage +import reactor.test.test + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since May 2018 + */ +internal object WireChunkDecoderTest : Spek({ + val alloc = UnpooledByteBufAllocator.DEFAULT + val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() + val anotherPayload = "ala ma kota a kot ma ale".toByteArray() + + val encoder = WireFrameEncoder(alloc) + + fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) + + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) + + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should be released: $bb ref count") + .isEqualTo(0) + } + } + + fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should not be released: $bb ref count") + .isEqualTo(1) + } + } + + describe("decoding wire protocol") { + given("empty input") { + val input = Unpooled.EMPTY_BUFFER + + it("should yield empty result") { + createInstance().decode(input).test().verifyComplete() + } + } + + given("input with no readable bytes") { + val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) + + it("should yield empty result") { + createInstance().decode(input).test().verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + + given("invalid input (not starting with marker)") { + val input = Unpooled.wrappedBuffer(samplePayload) + + it("should yield error") { + createInstance().decode(input).test() + .verifyError(WireFrameException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input") { + val input = WireFrameMessage(samplePayload) + + it("should yield decoded input frame") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + } + + given("valid input with part of next frame") { + val input = Unpooled.buffer() + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) + .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3)) + + it("should yield decoded input frame") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input with garbage after it") { + val input = Unpooled.buffer() + .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame and error") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyError(WireFrameException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("two inputs containing two separate messages") { + val input1 = encoder.encode(WireFrameMessage(samplePayload)) + val input2 = encoder.encode(WireFrameMessage(anotherPayload)) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing 1st frame and 2nd input containing garbage") { + val input1 = encoder.encode(WireFrameMessage(samplePayload)) + val input2 = Unpooled.wrappedBuffer(anotherPayload) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1) + .test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .verifyError(WireFrameException::class.java) + } + + it("should release memory for 1st input") { + verifyMemoryReleased(input1) + } + + it("should leave memory unreleased for 2nd input") { + verifyMemoryNotReleased(input2) + } + } + + + given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) + + val input1 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2, 3) + val input2 = Unpooled.buffer().writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { + val frame1 = encoder.encode(WireFrameMessage(samplePayload)) + val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) + + val input1 = Unpooled.buffer() + .writeBytes(frame1, 5) + val input2 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + } +})
\ No newline at end of file |