aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/test/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-08 16:29:31 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 07:06:19 +0200
commit7c3b59560f015b65882a56db585b7d4bdd10d434 (patch)
tree4c15d3657e373d3a681fdd2ab865623aeecc82e7 /hv-collector-core/src/test/kotlin
parent07bbbf71cd65b29f446a1b475add87f20365db83 (diff)
Implement Kafka Sink
Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
Diffstat (limited to 'hv-collector-core/src/test/kotlin')
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt22
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt29
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt7
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt (renamed from hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt)56
4 files changed, 69 insertions, 45 deletions
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index df2840b9..017187a4 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -20,27 +20,29 @@
package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.buffer.Unpooled.wrappedBuffer
+import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent
-import org.assertj.core.api.Assertions.assertThat
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.*
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
- fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf {
+ fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData {
val msg = VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
.build()
- return wrappedBuffer(msg.toByteArray())
+ return msg.toByteData()
}
given("Message validator") {
@@ -79,7 +81,7 @@ internal object MessageValidatorTest : Spek({
}
on("ves hv message bytes") {
- val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER)
+ val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
it("should not accept message with default header") {
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
}
@@ -97,7 +99,7 @@ internal object MessageValidatorTest : Spek({
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
.build()
- val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+ val rawMessageBytes = msg.toByteData()
it("should not accept not fully initialized message header ") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 3812db58..c852f5f4 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -1,11 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
package org.onap.dcae.collectors.veshv.impl
-import io.netty.buffer.Unpooled
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -34,7 +53,7 @@ object RouterTest : Spek({
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -55,7 +74,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -63,7 +82,7 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(1)
+ assertThat(result?.partition).isEqualTo(0)
}
it("should be routed to proper topic") {
@@ -76,7 +95,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 263ad441..90b34b1c 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -21,13 +21,14 @@ package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
-import io.netty.buffer.Unpooled.wrappedBuffer
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -45,7 +46,7 @@ internal object VesDecoderTest : Spek({
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
.build()
- val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+ val rawMessageBytes = msg.toByteData()
it("should decode only header and pass it on along with raw message") {
@@ -60,7 +61,7 @@ internal object VesDecoderTest : Spek({
}
on("invalid ves hv message bytes") {
- val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
+ val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
it("should throw error") {
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 0a10aa1f..1ddcc3dc 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -28,6 +28,8 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
import reactor.test.test
@@ -35,13 +37,17 @@ import reactor.test.test
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
* @since May 2018
*/
-internal object WireDecoderTest : Spek({
+internal object WireChunkDecoderTest : Spek({
val alloc = UnpooledByteBufAllocator.DEFAULT
val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
- fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+ val encoder = WireFrameEncoder(alloc)
+
+ fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
assertThat(bb.refCnt())
@@ -63,7 +69,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.EMPTY_BUFFER
it("should yield empty result") {
- WireDecoder().decode(input).test().verifyComplete()
+ createInstance().decode(input).test().verifyComplete()
}
}
@@ -71,7 +77,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
it("should yield empty result") {
- WireDecoder().decode(input).test().verifyComplete()
+ createInstance().decode(input).test().verifyComplete()
}
it("should release memory") {
@@ -83,7 +89,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.wrappedBuffer(samplePayload)
it("should yield error") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.verifyError(InvalidWireFrameMarkerException::class.java)
}
@@ -93,10 +99,10 @@ internal object WireDecoderTest : Spek({
}
given("valid input") {
- val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+ val input = WireFrame(samplePayload)
it("should yield decoded input frame") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -104,11 +110,11 @@ internal object WireDecoderTest : Spek({
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -120,11 +126,11 @@ internal object WireDecoderTest : Spek({
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyError(InvalidWireFrameMarkerException::class.java)
}
@@ -135,11 +141,11 @@ internal object WireDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val input1 = encoder.encode(WireFrame(samplePayload))
+ val input2 = encoder.encode(WireFrame(anotherPayload))
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -154,16 +160,12 @@ internal object WireDecoderTest : Spek({
}
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input1 = encoder.encode(WireFrame(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1)
- .doOnNext {
- // releasing retained payload
- it.payload.release()
- }
.test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -182,8 +184,8 @@ internal object WireDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val frame1 = encoder.encode(WireFrame(samplePayload))
+ val frame2 = encoder.encode(WireFrame(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -191,7 +193,7 @@ internal object WireDecoderTest : Spek({
val input2 = Unpooled.buffer().writeBytes(frame2)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -206,8 +208,8 @@ internal object WireDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val frame1 = encoder.encode(WireFrame(samplePayload))
+ val frame2 = encoder.encode(WireFrame(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -216,7 +218,7 @@ internal object WireDecoderTest : Spek({
.writeBytes(frame2)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()