aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt128
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt112
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt74
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt157
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt86
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt234
-rw-r--r--sources/hv-collector-core/src/test/resources/logback-test.xml35
7 files changed, 826 insertions, 0 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
new file mode 100644
index 00000000..3090042d
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -0,0 +1,128 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+internal object MessageValidatorTest : Spek({
+
+ given("Message validator") {
+ val cut = MessageValidator
+
+ on("ves hv message including header with fully initialized fields") {
+ val commonHeader = commonHeader()
+
+ it("should accept message with fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+ }
+
+ VesEventDomain.values()
+ .forEach { domain ->
+ it("should accept message with $domain domain") {
+ val header = commonHeader(domain)
+ val vesMessage = VesMessage(header, vesEventBytes(header))
+ assertThat(cut.isValid(vesMessage))
+ .isTrue()
+ }
+ }
+ }
+
+ on("ves hv message bytes") {
+ val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
+ it("should not accept message with default header") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ val priorityTestCases = mapOf(
+ Priority.PRIORITY_NOT_PROVIDED to false,
+ Priority.HIGH to true
+ )
+
+ priorityTestCases.forEach { value, expectedResult ->
+ on("ves hv message including header with priority $value") {
+ val commonEventHeader = commonHeader(priority = value)
+ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
+
+ it("should resolve validation result") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
+ .isEqualTo(expectedResult)
+ }
+ }
+ }
+
+ on("ves hv message including header with not initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .build()
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept not fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with non-string major part") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with major part != 7") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
new file mode 100644
index 00000000..e8a31231
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.None
+import arrow.core.Some
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object RouterTest : Spek({
+ given("sample configuration") {
+ val config = routing {
+
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic("ves_rtpm")
+ withFixedPartitioning(2)
+ }
+
+ defineRoute {
+ fromDomain(SYSLOG.domainName)
+ toTopic("ves_trace")
+ withFixedPartitioning()
+ }
+ }.build()
+ val cut = Router(config)
+
+ on("message with existing route (rtpm)") {
+ val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ }
+ }
+
+ on("message with existing route (trace)") {
+ val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ }
+ }
+
+ on("message with unknown route") {
+ val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should not have route available") {
+ assertThat(result).isEqualTo(None)
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
new file mode 100644
index 00000000..8950a557
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import com.google.protobuf.ByteString
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import java.nio.charset.Charset
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+
+internal object VesDecoderTest : Spek({
+
+ given("ves message decoder") {
+ val cut = VesDecoder()
+
+ on("ves hv message bytes") {
+ val commonHeader = commonHeader(HEARTBEAT)
+ val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
+
+ it("should decode only header and pass it on along with raw message") {
+ val expectedMessage = VesMessage(
+ commonHeader,
+ rawMessageBytes
+ )
+
+ assertTrue {
+ cut.decode(rawMessageBytes).exists {
+ it == expectedMessage
+ }
+ }
+ }
+ }
+
+ on("invalid ves hv message bytes") {
+ val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
+
+ it("should throw error") {
+ assertFailedWithError(cut.decode(rawMessageBytes))
+ }
+ }
+ }
+})
+
+private fun <A> assertFailedWithError(option: Option<A>) =
+ option.exists {
+ fail("Error expected")
+ }
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
new file mode 100644
index 00000000..c6364f74
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -0,0 +1,157 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import reactor.test.StepVerifier
+import java.time.Duration
+import java.util.*
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object ConsulConfigurationProviderTest : Spek({
+
+ describe("Consul configuration provider") {
+
+ val httpAdapterMock: HttpAdapter = mock()
+ val healthStateProvider = HealthState.INSTANCE
+
+ given("valid resource url") {
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ .thenReturn(Mono.just(constructConsulResponse()))
+
+ it("should use received configuration") {
+
+ StepVerifier.create(consulConfigProvider().take(1))
+ .consumeNextWith {
+
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertThat(FAULT.domainName)
+ .describedAs("routed domain 1")
+ .isEqualTo(route1.domain)
+ assertThat("test-topic-1")
+ .describedAs("target topic 1")
+ .isEqualTo(route1.targetTopic)
+
+ val route2 = it.routing.routes[1]
+ assertThat(HEARTBEAT.domainName)
+ .describedAs("routed domain 2")
+ .isEqualTo(route2.domain)
+ assertThat("test-topic-2")
+ .describedAs("target topic 2")
+ .isEqualTo(route2.targetTopic)
+
+ }.verifyComplete()
+ }
+ }
+
+ }
+ given("invalid resource url") {
+ val invalidUrl = "http://invalid-url/"
+
+ val iterationCount = 3L
+ val consulConfigProvider = constructConsulConfigProvider(
+ invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ )
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should interrupt the flux") {
+
+ StepVerifier.create(consulConfigProvider())
+ .verifyErrorMessage("Test exception")
+ }
+
+ it("should update the health state") {
+ StepVerifier.create(healthStateProvider().take(iterationCount))
+ .expectNextCount(iterationCount - 1)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .verifyComplete()
+ }
+ }
+ }
+ }
+
+})
+
+private fun constructConsulConfigProvider(url: String,
+ httpAdapter: HttpAdapter,
+ healthState: HealthState,
+ iterationCount: Long = 1
+): ConsulConfigurationProvider {
+
+ val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConsulConfigurationProvider(
+ httpAdapter,
+ url,
+ firstRequestDelay,
+ requestInterval,
+ healthState,
+ retry
+ )
+}
+
+
+const val kafkaAddress = "message-router-kafka"
+
+fun constructConsulResponse(): String =
+ """{
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "collector.routing": [
+ {
+ "fromDomain": "fault",
+ "toTopic": "test-topic-1"
+ },
+ {
+ "fromDomain": "heartbeat",
+ "toTopic": "test-topic-2"
+ }
+ ]
+ }"""
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
new file mode 100644
index 00000000..91457faf
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -0,0 +1,86 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Mono
+import reactor.netty.http.client.HttpClient
+import reactor.netty.http.server.HttpServer
+import reactor.test.StepVerifier
+import reactor.test.test
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object HttpAdapterTest : Spek({
+ describe("HttpAdapter") {
+
+ val httpServer = HttpServer.create()
+ .host("127.0.0.1")
+ .route { routes ->
+ routes.get("/url") { req, resp ->
+ resp.sendString(Mono.just(req.uri()))
+ }
+ }
+ .bindNow()
+ val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
+ val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
+
+ afterGroup {
+ httpServer.disposeNow()
+ }
+
+ given("url without query params") {
+ val url = "/url"
+
+ it("should not append query string") {
+ httpAdapter.get(url).test()
+ .expectNext(url)
+ .verifyComplete()
+ }
+ }
+
+ given("url with query params") {
+ val queryParams = mapOf(Pair("p", "the-value"))
+ val url = "/url"
+
+ it("should add them as query string to the url") {
+ httpAdapter.get(url, queryParams).test()
+ .expectNext("/url?p=the-value")
+ .verifyComplete()
+ }
+ }
+
+ given("invalid url") {
+ val invalidUrl = "/wtf"
+
+ it("should interrupt the flux") {
+ StepVerifier
+ .create(httpAdapter.get(invalidUrl))
+ .verifyError()
+ }
+ }
+ }
+
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
new file mode 100644
index 00000000..f06a0dc7
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -0,0 +1,234 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import reactor.test.test
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireChunkDecoderTest : Spek({
+ val alloc = UnpooledByteBufAllocator.DEFAULT
+ val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
+ val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
+
+ val encoder = WireFrameEncoder(alloc)
+
+ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
+
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+
+ fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should be released: $bb ref count")
+ .isEqualTo(0)
+ }
+ }
+
+ fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should not be released: $bb ref count")
+ .isEqualTo(1)
+ }
+ }
+
+ describe("decoding wire protocol") {
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ createInstance().decode(input).test().verifyComplete()
+ }
+ }
+
+ given("input with no readable bytes") {
+ val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
+
+ it("should yield empty result") {
+ createInstance().decode(input).test().verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
+ given("invalid input (not starting with marker)") {
+ val input = Unpooled.wrappedBuffer(samplePayload)
+
+ it("should yield error") {
+ createInstance().decode(input).test()
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input") {
+ val input = WireFrameMessage(samplePayload)
+
+ it("should yield decoded input frame") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+ }
+
+ given("valid input with part of next frame") {
+ val input = Unpooled.buffer()
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
+
+ it("should yield decoded input frame") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("two inputs containing two separate messages") {
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing 1st frame and 2nd input containing garbage") {
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = Unpooled.wrappedBuffer(anotherPayload)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1)
+ .test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should release memory for 1st input") {
+ verifyMemoryReleased(input1)
+ }
+
+ it("should leave memory unreleased for 2nd input") {
+ verifyMemoryNotReleased(input2)
+ }
+ }
+
+
+ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2, 3)
+ val input2 = Unpooled.buffer().writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1, 5)
+ val input2 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/resources/logback-test.xml b/sources/hv-collector-core/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..9a4eacfe
--- /dev/null
+++ b/sources/hv-collector-core/src/test/resources/logback-test.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration>