From 7c3b59560f015b65882a56db585b7d4bdd10d434 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 8 Jun 2018 16:29:31 +0200 Subject: Implement Kafka Sink Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk Issue-ID: DCAEGEN2-601 --- .../collectors/veshv/factory/CollectorFactory.kt | 10 +- .../onap/dcae/collectors/veshv/impl/VesDecoder.kt | 9 +- .../dcae/collectors/veshv/impl/VesHvCollector.kt | 27 +-- .../veshv/impl/adapters/AdapterFactory.kt | 1 + .../collectors/veshv/impl/adapters/KafkaSink.kt | 68 ------ .../veshv/impl/adapters/KafkaSinkProvider.kt | 48 ----- .../veshv/impl/adapters/LoggingSinkProvider.kt | 2 +- .../veshv/impl/adapters/kafka/KafkaSink.kt | 67 ++++++ .../veshv/impl/adapters/kafka/KafkaSinkProvider.kt | 45 ++++ .../impl/adapters/kafka/ProtobufSerializer.kt | 40 ++++ .../impl/adapters/kafka/VesMessageSerializer.kt | 37 ++++ .../veshv/impl/wire/StreamBufferEmitter.kt | 8 +- .../collectors/veshv/impl/wire/WireChunkDecoder.kt | 56 +++++ .../dcae/collectors/veshv/impl/wire/WireDecoder.kt | 56 ----- .../collectors/veshv/impl/wire/WireFrameSink.kt | 4 +- .../onap/dcae/collectors/veshv/model/VesMessage.kt | 4 +- .../onap/dcae/collectors/veshv/model/routing.kt | 2 +- .../collectors/veshv/impl/MessageValidatorTest.kt | 22 +- .../onap/dcae/collectors/veshv/impl/RouterTest.kt | 29 ++- .../dcae/collectors/veshv/impl/VesDecoderTest.kt | 7 +- .../veshv/impl/wire/WireChunkDecoderTest.kt | 235 +++++++++++++++++++++ .../collectors/veshv/impl/wire/WireDecoderTest.kt | 233 -------------------- 22 files changed, 549 insertions(+), 461 deletions(-) delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt create mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt delete mode 100644 hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt create mode 100644 hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt delete mode 100644 hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt (limited to 'hv-collector-core') diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 913d8f50..73f4d09d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference @@ -36,7 +37,8 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) { +class CollectorFactory(val configuration: ConfigurationProvider, + private val sinkProvider: SinkProvider) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -48,7 +50,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { WireDecoder(it) }, + { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, VesDecoder(), MessageValidator(), Router(config.routing), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 60e7d70a..591a48b7 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl -import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent /** @@ -30,8 +29,8 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bb: ByteBuf): VesMessage { - val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader - return VesMessage(decodedHeader, bb) + fun decode(bytes: ByteData): VesMessage { + val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader + return VesMessage(decodedHeader, bytes) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index ac11b3e8..965943f6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,57 +24,42 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.util.concurrent.atomic.AtomicInteger /** * @author Piotr Jaszczyk * @since May 2018 */ internal class VesHvCollector( - private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder, + private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, private val sink: Sink) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux): Mono = - wireDecoderSupplier(alloc).let { wireDecoder -> + wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .concatMap(wireDecoder::decode) .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) - .filter(this::validate) + .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) - .doOnNext(this::releaseMemory) .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } - private fun validate(msg: VesMessage): Boolean { - val valid = validator.isValid(msg) - if (!valid) { - msg.rawMessage.release() - } - return valid - } - private fun findRoute(msg: VesMessage): Mono = omitWhenNull(msg, router::findDestination) - private fun releaseMemory(msg: VesMessage) { - logger.trace { "Releasing memory from ${msg.rawMessage}" } - msg.rawMessage.release() - } - - private fun releaseBuffersMemory(wireDecoder: WireDecoder) { - wireDecoder.release() + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { + wireChunkDecoder.release() } private fun omitWhenNull(input: T, mapper: (T) -> V?): Mono = Mono.justOrEmpty(mapper(input)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8e6db2af..358be108 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt deleted file mode 100644 index db7845c7..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderRecord -import reactor.kafka.sender.SenderResult -import java.nio.ByteBuffer - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class KafkaSink(private val sender: KafkaSender) : Sink { - - override fun send(messages: Flux): Flux { - val records = messages.map(this::vesToKafkaRecord) - return sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) - .map { it.correlationMetadata() } - } - - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { - return SenderRecord.create( - msg.topic, - msg.partition, - System.currentTimeMillis(), - msg.message.header, - msg.message.rawMessage.nioBuffer(), - msg.message) - } - - private fun logException(senderResult: SenderResult) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - - private fun isSuccessful(senderResult: SenderResult) = senderResult.exception() == null - - companion object { - val logger = Logger(KafkaSink::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt deleted file mode 100644 index 82452e1e..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt +++ /dev/null @@ -1,48 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.ByteBufferSerializer -import org.apache.kafka.common.serialization.StringSerializer -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.ves.VesEventV5 -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.nio.ByteBuffer - -/** - * @author Piotr Jaszczyk - * @since June 2018 - */ -internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) - } - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create() - .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java) - -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 62b6d1aa..b943e4e5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong()) + val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(logMessageSupplier) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt new file mode 100644 index 00000000..6142fa3c --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -0,0 +1,67 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.core.publisher.Flux +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderRecord +import reactor.kafka.sender.SenderResult + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class KafkaSink(private val sender: KafkaSender) : Sink { + + override fun send(messages: Flux): Flux { + val records = messages.map(this::vesToKafkaRecord) + return sender.send(records) + .doOnNext(::logException) + .filter(::isSuccessful) + .map { it.correlationMetadata() } + } + + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord { + return SenderRecord.create( + msg.topic, + msg.partition, + System.currentTimeMillis(), + msg.message.header, + msg.message, + msg.message) + } + + private fun logException(senderResult: SenderResult) { + if (senderResult.exception() != null) { + logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } + } + } + + private fun isSuccessful(senderResult: SenderResult) = senderResult.exception() == null + + companion object { + val logger = Logger(KafkaSink::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt new file mode 100644 index 00000000..a00a02d2 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -0,0 +1,45 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.apache.kafka.clients.producer.ProducerConfig +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +internal class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + } + + private fun constructSenderOptions(config: CollectorConfiguration) = + SenderOptions.create() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt new file mode 100644 index 00000000..9753d9e5 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import com.google.protobuf.MessageLite +import org.apache.kafka.common.serialization.Serializer + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class ProtobufSerializer :Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + // no configuration + } + + override fun serialize(topic: String?, data: MessageLite?): ByteArray? = + data?.toByteArray() + + override fun close() { + // cleanup not needed + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt new file mode 100644 index 00000000..7a6ac7c8 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.apache.kafka.common.serialization.Serializer +import org.onap.dcae.collectors.veshv.model.VesMessage + +/** + * @author Piotr Jaszczyk + * @since June 2018 + */ +class VesMessageSerializer : Serializer { + override fun configure(configs: MutableMap?, isKey: Boolean) { + } + + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + + override fun close() { + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt index e4dd7cf6..34a8b928 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf import io.netty.buffer.CompositeByteBuf import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink @@ -33,6 +34,7 @@ import java.util.function.Consumer * @since May 2018 */ internal class StreamBufferEmitter( + private val decoder: WireFrameDecoder, private val streamBuffer: CompositeByteBuf, private val newFrame: ByteBuf) : Consumer> { @@ -58,15 +60,15 @@ internal class StreamBufferEmitter( streamBuffer.discardReadComponents() } sink.onRequest { requestedFrameCount -> - WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber() + WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber() } } } } companion object { - fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux = - Flux.create(StreamBufferEmitter(streamBuffer, newFrame)) + fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux = + Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame)) private const val INCREASE_WRITER_INDEX = true private val logger = Logger(StreamBufferEmitter::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt new file mode 100644 index 00000000..580d36c5 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk + * @since May 2018 + */ +internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { + private val streamBuffer = alloc.compositeBuffer() + + fun decode(byteBuf: ByteBuf): Flux = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf) + .doOnSubscribe { logIncomingMessage(byteBuf) } + .doOnNext(this::logDecodedWireMessage) + + fun release() { + streamBuffer.release() + } + + private fun logIncomingMessage(wire: ByteBuf) { + logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + } + + private fun logDecodedWireMessage(wire: WireFrame) { + logger.trace { "Wire payload size: ${wire.payloadSize} B." } + } + + companion object { + val logger = Logger(VesHvCollector::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt deleted file mode 100644 index b701aaf2..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt +++ /dev/null @@ -1,56 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux - -/** - * @author Piotr Jaszczyk - * @since May 2018 - */ -internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() - - fun decode(byteBuf: ByteBuf): Flux = StreamBufferEmitter.createFlux(streamBuffer, byteBuf) - .doOnSubscribe { logIncomingMessage(byteBuf) } - .doOnNext(this::logDecodedWireMessage) - - fun release() { - streamBuffer.release() - } - - - private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } - } - - private fun logDecodedWireMessage(wire: WireFrame) { - logger.trace { "Wire payload size: ${wire.payloadSize} B." } - } - - companion object { - val logger = Logger(VesHvCollector::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index bc9c8389..a576dc65 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.FluxSink @@ -30,6 +31,7 @@ import reactor.core.publisher.FluxSink * @since May 2018 */ internal class WireFrameSink( + private val decoder: WireFrameDecoder, private val streamBuffer: ByteBuf, private val sink: FluxSink, private val requestedFrameCount: Long) { @@ -80,7 +82,7 @@ internal class WireFrameSink( private fun decodeFirstFrameFromBuffer(): WireFrame? = try { - WireFrame.decodeFirst(streamBuffer) + decoder.decodeFirst(streamBuffer) } catch (ex: MissingWireFrameBytesException) { logger.debug { "${ex.message} - waiting for more data" } null diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 38256896..03c53e10 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader /** * @author Piotr Jaszczyk * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf) +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 10e79156..bc030587 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -73,7 +73,7 @@ class RouteBuilder { this.targetTopic = targetTopic } - fun withFixedPartitioning(num: Int = 1) { + fun withFixedPartitioning(num: Int = 0) { partitioning = { _ -> num } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index df2840b9..017187a4 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -20,27 +20,29 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import io.netty.buffer.Unpooled.wrappedBuffer +import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.toByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent -import org.assertj.core.api.Assertions.assertThat -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.* +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder internal object MessageValidatorTest : Spek({ - fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf { + fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData { val msg = VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() - return wrappedBuffer(msg.toByteArray()) + return msg.toByteData() } given("Message validator") { @@ -79,7 +81,7 @@ internal object MessageValidatorTest : Spek({ } on("ves hv message bytes") { - val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER) + val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) it("should not accept message with default header") { assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() } @@ -97,7 +99,7 @@ internal object MessageValidatorTest : Spek({ .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!")) .build() - val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + val rawMessageBytes = msg.toByteData() it("should not accept not fully initialized message header ") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 3812db58..c852f5f4 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -1,11 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.impl -import io.netty.buffer.Unpooled import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -34,7 +53,7 @@ object RouterTest : Spek({ val cut = Router(config) on("message with existing route (rtpm)") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -55,7 +74,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -63,7 +82,7 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(1) + assertThat(result?.partition).isEqualTo(0) } it("should be routed to proper topic") { @@ -76,7 +95,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY) val result = cut.findDestination(message) it("should not have route available") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 263ad441..90b34b1c 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -21,13 +21,14 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException -import io.netty.buffer.Unpooled.wrappedBuffer import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.toByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -45,7 +46,7 @@ internal object VesDecoderTest : Spek({ .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements")) .build() - val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + val rawMessageBytes = msg.toByteData() it("should decode only header and pass it on along with raw message") { @@ -60,7 +61,7 @@ internal object VesDecoderTest : Spek({ } on("invalid ves hv message bytes") { - val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset())) + val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) it("should throw error") { assertThatExceptionOfType(InvalidProtocolBufferException::class.java) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt new file mode 100644 index 00000000..1ddcc3dc --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -0,0 +1,235 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import reactor.test.test + +/** + * @author Piotr Jaszczyk @nokia.com> + * @since May 2018 + */ +internal object WireChunkDecoderTest : Spek({ + val alloc = UnpooledByteBufAllocator.DEFAULT + val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() + val anotherPayload = "ala ma kota a kot ma ale".toByteArray() + + val encoder = WireFrameEncoder(alloc) + + fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) + + fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) + + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should be released: $bb ref count") + .isEqualTo(0) + } + } + + fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should not be released: $bb ref count") + .isEqualTo(1) + } + } + + describe("decoding wire protocol") { + given("empty input") { + val input = Unpooled.EMPTY_BUFFER + + it("should yield empty result") { + createInstance().decode(input).test().verifyComplete() + } + } + + given("input with no readable bytes") { + val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) + + it("should yield empty result") { + createInstance().decode(input).test().verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + + given("invalid input (not starting with marker)") { + val input = Unpooled.wrappedBuffer(samplePayload) + + it("should yield error") { + createInstance().decode(input).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input") { + val input = WireFrame(samplePayload) + + it("should yield decoded input frame") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + } + + given("valid input with part of next frame") { + val input = Unpooled.buffer() + .writeBytes(encoder.encode(WireFrame(samplePayload))) + .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3)) + + it("should yield decoded input frame") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input with garbage after it") { + val input = Unpooled.buffer() + .writeBytes(encoder.encode(WireFrame(samplePayload))) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame and error") { + createInstance().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("two inputs containing two separate messages") { + val input1 = encoder.encode(WireFrame(samplePayload)) + val input2 = encoder.encode(WireFrame(anotherPayload)) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing 1st frame and 2nd input containing garbage") { + val input1 = encoder.encode(WireFrame(samplePayload)) + val input2 = Unpooled.wrappedBuffer(anotherPayload) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1) + .test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should release memory for 1st input") { + verifyMemoryReleased(input1) + } + + it("should leave memory unreleased for 2nd input") { + verifyMemoryNotReleased(input2) + } + } + + + given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { + val frame1 = encoder.encode(WireFrame(samplePayload)) + val frame2 = encoder.encode(WireFrame(anotherPayload)) + + val input1 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2, 3) + val input2 = Unpooled.buffer().writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { + val frame1 = encoder.encode(WireFrame(samplePayload)) + val frame2 = encoder.encode(WireFrame(anotherPayload)) + + val input1 = Unpooled.buffer() + .writeBytes(frame1, 5) + val input2 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = createInstance() + cut.decode(input1).test() + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + } +}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt deleted file mode 100644 index 0a10aa1f..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt +++ /dev/null @@ -1,233 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import reactor.test.test - -/** - * @author Piotr Jaszczyk @nokia.com> - * @since May 2018 - */ -internal object WireDecoderTest : Spek({ - val alloc = UnpooledByteBufAllocator.DEFAULT - val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() - val anotherPayload = "ala ma kota a kot ma ale".toByteArray() - - fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc)) - - fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { - for (bb in byteBuffers) { - assertThat(bb.refCnt()) - .describedAs("should be released: $bb ref count") - .isEqualTo(0) - } - } - - fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { - for (bb in byteBuffers) { - assertThat(bb.refCnt()) - .describedAs("should not be released: $bb ref count") - .isEqualTo(1) - } - } - - describe("decoding wire protocol") { - given("empty input") { - val input = Unpooled.EMPTY_BUFFER - - it("should yield empty result") { - WireDecoder().decode(input).test().verifyComplete() - } - } - - given("input with no readable bytes") { - val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) - - it("should yield empty result") { - WireDecoder().decode(input).test().verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input) - } - } - - given("invalid input (not starting with marker)") { - val input = Unpooled.wrappedBuffer(samplePayload) - - it("should yield error") { - WireDecoder().decode(input).test() - .verifyError(InvalidWireFrameMarkerException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("valid input") { - val input = WireFrame(Unpooled.wrappedBuffer(samplePayload)) - - it("should yield decoded input frame") { - WireDecoder().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - } - } - - given("valid input with part of next frame") { - val input = Unpooled.buffer() - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3)) - - it("should yield decoded input frame") { - WireDecoder().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("valid input with garbage after it") { - val input = Unpooled.buffer() - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) - .writeBytes(Unpooled.wrappedBuffer(samplePayload)) - - it("should yield decoded input frame and error") { - WireDecoder().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyError(InvalidWireFrameMarkerException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("two inputs containing two separate messages") { - val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) - - it("should yield decoded input frames") { - val cut = WireDecoder() - cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - - given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val input2 = Unpooled.wrappedBuffer(anotherPayload) - - it("should yield decoded input frames") { - val cut = WireDecoder() - cut.decode(input1) - .doOnNext { - // releasing retained payload - it.payload.release() - } - .test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .verifyError(InvalidWireFrameMarkerException::class.java) - } - - it("should release memory for 1st input") { - verifyMemoryReleased(input1) - } - - it("should leave memory unreleased for 2nd input") { - verifyMemoryNotReleased(input2) - } - } - - - given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) - - val input1 = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2, 3) - val input2 = Unpooled.buffer().writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = WireDecoder() - cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - - given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) - - val input1 = Unpooled.buffer() - .writeBytes(frame1, 5) - val input2 = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = WireDecoder() - cut.decode(input1).test() - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - } -}) -- cgit 1.2.3-korg