diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-07 11:52:16 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-01 13:06:43 +0200 |
commit | 07bbbf71cd65b29f446a1b475add87f20365db83 (patch) | |
tree | e64fcf12c21e46358043744476d68765634d7f6f | |
parent | 767d0464a19e0949d2919e6df15c9653dec50503 (diff) |
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer
snapshots - not separate messages. That means that we need to join
incomming buffers and then split it into separate WireFrames.
Closes ONAP-312
Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
39 files changed, 1031 insertions, 299 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index 68bb3d0b..af8e0e0e 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -25,7 +25,7 @@ services: depends_on: - kafka volumes: - - /etc/ves-hv/:/etc/ves-hv/ + - ./ssl/:/etc/ves-hv/ xnf-simulator: build: context: hv-collector-client-simulator @@ -33,4 +33,4 @@ services: depends_on: - hv-collector volumes: - - /etc/ves-hv/:/etc/ves-hv/
\ No newline at end of file + - ./ssl/:/etc/ves-hv/
\ No newline at end of file diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile index 159f900d..19c4c878 100644 --- a/hv-collector-client-simulator/Dockerfile +++ b/hv-collector-client-simulator/Dockerfile @@ -6,8 +6,8 @@ LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" LABEL maintainer="Nokia Wroclaw ONAP Team" WORKDIR /opt/ves-hv-client-simulator -ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] +ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"] CMD ["--ves-host", "hv-collector", "--ves-port", "6061"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ -COPY target/hv-collector-client-simulator-*.jar ./
\ No newline at end of file +COPY target/hv-collector-client-simulator-*.jar ./ diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml index e7a25855..012bda53 100644 --- a/hv-collector-client-simulator/pom.xml +++ b/hv-collector-client-simulator/pom.xml @@ -132,6 +132,12 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-tcnative-boringssl-static</artifactId> + <scope>runtime</scope> + <classifier>${os.detected.classifier}</classifier> + </dependency> + <dependency> <groupId>com.nhaarman</groupId> <artifactId>mockito-kotlin</artifactId> </dependency> diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt index 49653b57..b946689f 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt @@ -28,7 +28,7 @@ import java.io.File import java.nio.file.Paths internal object DefaultValues { - const val MESSAGES_AMOUNT = 1 + const val MESSAGES_AMOUNT = -1L const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key" const val CERT_FILE = "/etc/ves-hv/client.crt" const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt" @@ -98,7 +98,7 @@ internal object ArgBasedClientConfiguration { val cmdLine = parser.parse(options, args) val host = cmdLine.stringValue(OPT_VES_HOST) val port = cmdLine.intValue(OPT_VES_PORT) - val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT) + val msgsAmount = cmdLine.longValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT) return ClientConfiguration( host, port, @@ -121,8 +121,9 @@ internal object ArgBasedClientConfiguration { private fun stringPathToPath(path: String) = Paths.get(File(path).toURI()) - private fun CommandLine.intValueOrDefault(option: Option, default: Int) = - getOptionValue(option.opt)?.toInt() ?: default + + private fun CommandLine.longValueOrDefault(option: Option, default: Long) = + getOptionValue(option.opt)?.toLong() ?: default private fun CommandLine.intValue(option: Option) = getOptionValue(option.opt).toInt() diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt index e835ee95..83d6f7c0 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt @@ -27,4 +27,4 @@ data class ClientConfiguration( val vesHost: String, val vesPort: Int, val security: ClientSecurityConfiguration, - val messagesAmount: Int) + val messagesAmount: Long) diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index 0c578b38..d5f7c7c8 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -38,8 +38,13 @@ class MessageFactory { const val DEFAULT_LAST_EPOCH: Long = 120034455 } - fun createMessageFlux(amount: Int = 1): Flux<WireFrame> = - Mono.just(createMessage()).repeat(amount.toLong()) + fun createMessageFlux(amount: Long = -1): Flux<WireFrame> = + Mono.fromCallable(this::createMessage).let { + if (amount < 0) + it.repeat() + else + it.repeat(amount) + } private fun createMessage(): WireFrame { @@ -57,14 +62,7 @@ class MessageFactory { .build() val payload = vesMessageBytes(commonHeader) - return WireFrame( - payload = payload, - mark = 0xFF, - majorVersion = 1, - minorVersion = 2, - payloadSize = payload.readableBytes()) - - + return WireFrame(payload) } private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index c911c533..29573e86 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.simulators.xnf.impl +import io.netty.buffer.Unpooled import io.netty.handler.ssl.ClientAuth import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder @@ -29,6 +30,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfig import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.reactivestreams.Publisher import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.ipc.netty.NettyInbound import reactor.ipc.netty.NettyOutbound import reactor.ipc.netty.tcp.TcpClient @@ -53,7 +55,6 @@ class VesHvClient(configuration: ClientConfiguration) { client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) }) } - // sending flux with multiple WireFrames not supported yet private fun handler(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound, messages: Flux<WireFrame>): Publisher<Void> { @@ -64,8 +65,8 @@ class VesHvClient(configuration: ClientConfiguration) { .subscribe { str -> logger.info("Server response: $str") } val frames = messages - .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } } .map { it.encode(nettyOutbound.alloc()) } + .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) return nettyOutbound .options { it.flushOnEach() } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ee7f49a6..68f999ef 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -36,12 +36,14 @@ fun main(args: Array<String>) { val clientConfig = ArgBasedClientConfiguration.parse(args) val messageFactory = MessageFactory() val client = VesHvClient(clientConfig) - client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) + client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount)) } catch (e: ArgBasedClientConfiguration.WrongArgumentException) { e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt") + System.exit(1) } catch (e: Exception) { logger.error(e.localizedMessage) logger.debug("An error occurred when starting ves client", e) + System.exit(2) } } diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt index edcec65f..405a15eb 100644 --- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt +++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt @@ -23,8 +23,7 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory -import kotlin.test.assertEquals +import reactor.test.test /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -35,16 +34,18 @@ object MessageFactoryTest : Spek({ val factory = MessageFactory() given("no parameters") { - it("should return flux with one message") { - val result = factory.createMessageFlux() - - assertEquals(1, result.count().block()) + it("should return infinite flux") { + val limit = 1000L + factory.createMessageFlux().take(limit).test() + .expectNextCount(limit) + .verifyComplete() } } given("messages amount") { it("should return message flux of specified size") { - val result = factory.createMessageFlux(5) - assertEquals(5, result.count().block()) + factory.createMessageFlux(5).test() + .expectNextCount(5) + .verifyComplete() } } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index dfbbdb56..ed686fe8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -20,12 +20,13 @@ package org.onap.dcae.collectors.veshv.boundary import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.model.ServerConfiguration import reactor.core.publisher.Flux import reactor.core.publisher.Mono interface Collector { - fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> + fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> } typealias CollectorProvider = () -> Collector diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 850d3a84..913d8f50 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference @@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - WireDecoder(), + { WireDecoder(it) }, VesDecoder(), MessageValidator(), Router(config.routing), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt index b0a9da81..12e1c1e6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt @@ -39,7 +39,8 @@ internal class MessageValidator { fun isValid(message: VesMessage): Boolean { val header = message.header - return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS + return ret } private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index cdc70f82..60e7d70a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,7 +19,6 @@ */ package org.onap.dcae.collectors.veshv.impl -import com.google.protobuf.InvalidProtocolBufferException import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bb: ByteBuf): VesMessage? = - try { - val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader - VesMessage(decodedHeader, bb) - } catch (ex: InvalidProtocolBufferException) { - logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" } - logger.debug("Cause", ex) - null - } - - - companion object { - private val logger = Logger(VesDecoder::class) + fun decode(bb: ByteBuf): VesMessage { + val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader + return VesMessage(decodedHeader, bb) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 535fbe12..ac11b3e8 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -20,47 +20,43 @@ package org.onap.dcae.collectors.veshv.impl import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono +import java.util.concurrent.atomic.AtomicInteger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ internal class VesHvCollector( - private val wireDecoder: WireDecoder, + private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder, private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, private val sink: Sink) : Collector { - override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> = - dataStream - .doOnNext(this::logIncomingMessage) - .flatMap(this::decodeWire) - .doOnNext(this::logDecodedWireMessage) - .flatMap(this::decodeProtobuf) - .filter(this::validate) - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext(this::releaseMemory) - .then() - private fun logIncomingMessage(wire: ByteBuf) { - logger.debug { "Got message with total ${wire.readableBytes()} B"} - } - - private fun logDecodedWireMessage(payload: ByteBuf) { - logger.debug { "Wire payload size: ${payload.readableBytes()} B"} - } - - private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode) - - private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode) + override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = + wireDecoderSupplier(alloc).let { wireDecoder -> + dataStream + .concatMap(wireDecoder::decode) + .filter(WireFrame::isValid) + .map(WireFrame::payload) + .map(protobufDecoder::decode) + .filter(this::validate) + .flatMap(this::findRoute) + .compose(sink::send) + .doOnNext(this::releaseMemory) + .doOnTerminate { releaseBuffersMemory(wireDecoder) } + .then() + } private fun validate(msg: VesMessage): Boolean { val valid = validator.isValid(msg) @@ -73,21 +69,16 @@ internal class VesHvCollector( private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) private fun releaseMemory(msg: VesMessage) { + logger.trace { "Releasing memory from ${msg.rawMessage}" } msg.rawMessage.release() } - private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) - - private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> { - val result = mapper(input) - return if (result == null) { - input.release() - Mono.empty() - } else { - Mono.just(result) - } + private fun releaseBuffersMemory(wireDecoder: WireDecoder) { + wireDecoder.release() } + private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) + companion object { val logger = Logger(VesHvCollector::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 0aacb266..8e6db2af 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -19,24 +19,11 @@ */ package org.onap.dcae.collectors.veshv.impl.adapters -import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -import org.apache.kafka.common.serialization.ByteBufferSerializer -import org.apache.kafka.common.serialization.StringSerializer import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions -import java.nio.ByteBuffer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -51,33 +38,6 @@ object AdapterFactory { override fun invoke() = Flux.just(config) } - private class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - val sender = KafkaSender.create( - SenderOptions.create<CommonEventHeader, ByteBuffer>() - .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) - .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)) - return KafkaSink(sender) - } - } - - - private class LoggingSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return object : Sink { - private val logger = Logger(LoggingSinkProvider::class) - override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = - messages - .doOnNext { msg -> - logger.info { "Message routed to ${msg.topic}" } - } - .map { it.message } - - } - } - } - fun consulConfigurationProvider(url: String): ConfigurationProvider = ConsulConfigurationProvider(url, httpAdapter()) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt new file mode 100644 index 00000000..82452e1e --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt @@ -0,0 +1,48 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteBufferSerializer +import org.apache.kafka.common.serialization.StringSerializer +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.ves.VesEventV5 +import reactor.kafka.sender.KafkaSender +import reactor.kafka.sender.SenderOptions +import java.nio.ByteBuffer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +internal class KafkaSinkProvider : SinkProvider { + override fun invoke(config: CollectorConfiguration): Sink { + return KafkaSink(KafkaSender.create(constructSenderOptions(config))) + } + + private fun constructSenderOptions(config: CollectorConfiguration) = + SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>() + .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java) + +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt new file mode 100644 index 00000000..62b6d1aa --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -0,0 +1,64 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters + +import org.onap.dcae.collectors.veshv.boundary.Sink +import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.RoutedMessage +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import java.util.concurrent.atomic.AtomicLong + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +internal class LoggingSinkProvider : SinkProvider { + + override fun invoke(config: CollectorConfiguration): Sink { + return object : Sink { + private val logger = Logger(LoggingSinkProvider::class) + private val totalMessages = AtomicLong() + private val totalBytes = AtomicLong() + + override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> = + messages + .doOnNext(this::logMessage) + .map { it.message } + + private fun logMessage(msg: RoutedMessage) { + val msgs = totalMessages.addAndGet(1) + val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong()) + val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } + if (msgs % INFO_LOGGING_FREQ == 0L) + logger.info(logMessageSupplier) + else + logger.trace(logMessageSupplier) + } + + } + } + + companion object { + const val INFO_LOGGING_FREQ = 100_000 + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt index 208b1ba0..564aa8df 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.impl.socket +import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.model.ServerConfiguration @@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration, private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> { logger.debug("Got connection") + nettyOutbound.alloc() val sendHello = nettyOutbound .options { it.flushOnEach() } .sendString(Mono.just("ONAP_VES_HV/0.1\n")) .then() - val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive()) + val handleIncomingMessages = collectorProvider() + .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain()) return sendHello.then(handleIncomingMessages) } - companion object { private val logger = Logger(NettyTcpServer::class) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt new file mode 100644 index 00000000..e4dd7cf6 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -0,0 +1,74 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.CompositeByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux +import reactor.core.publisher.FluxSink +import java.util.concurrent.atomic.AtomicBoolean +import java.util.function.Consumer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class StreamBufferEmitter( + private val streamBuffer: CompositeByteBuf, + private val newFrame: ByteBuf) + : Consumer<FluxSink<WireFrame>> { + + private val subscribed = AtomicBoolean(false) + + override fun accept(sink: FluxSink<WireFrame>) { + when { + + subscribed.getAndSet(true) -> + sink.error(IllegalStateException("Wire frame emitter supports only one subscriber")) + + newFrame.readableBytes() == 0 -> { + logger.trace { "Discarding empty buffer" } + newFrame.release() + sink.complete() + } + + else -> { + streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame) + sink.onDispose { + logger.debug("Disposing read components") + streamBuffer.discardReadComponents() + } + sink.onRequest { requestedFrameCount -> + WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber() + } + } + } + } + + companion object { + fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> = + Flux.create(StreamBufferEmitter(streamBuffer, newFrame)) + + private const val INCREASE_WRITER_INDEX = true + private val logger = Logger(StreamBufferEmitter::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt new file mode 100644 index 00000000..b701aaf2 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt @@ -0,0 +1,56 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.impl.VesHvCollector +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.Flux + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { + private val streamBuffer = alloc.compositeBuffer() + + fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf) + .doOnSubscribe { logIncomingMessage(byteBuf) } + .doOnNext(this::logDecodedWireMessage) + + fun release() { + streamBuffer.release() + } + + + private fun logIncomingMessage(wire: ByteBuf) { + logger.trace { "Got message with total size of ${wire.readableBytes()} B" } + } + + private fun logDecodedWireMessage(wire: WireFrame) { + logger.trace { "Wire payload size: ${wire.payloadSize} B." } + } + + companion object { + val logger = Logger(VesHvCollector::class) + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt new file mode 100644 index 00000000..bc9c8389 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -0,0 +1,92 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.publisher.FluxSink + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since May 2018 + */ +internal class WireFrameSink( + private val streamBuffer: ByteBuf, + private val sink: FluxSink<WireFrame>, + private val requestedFrameCount: Long) { + + fun handleSubscriber() { + logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" } + + try { + if (requestedFrameCount == Long.MAX_VALUE) { + logger.trace { "Push based strategy" } + pushAvailableFrames() + } else { + logger.trace { "Pull based strategy - req $requestedFrameCount" } + pushUpToNumberOfFrames() + } + } catch (ex: Exception) { + sink.error(ex) + } + + logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" } + + } + + private fun pushAvailableFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + nextFrame = decodeFirstFrameFromBuffer() + } + sink.complete() + } + + private fun pushUpToNumberOfFrames() { + var nextFrame = decodeFirstFrameFromBuffer() + var remaining = requestedFrameCount + loop@ while (nextFrame != null && !sink.isCancelled) { + sink.next(nextFrame) + if (--remaining > 0) { + nextFrame = decodeFirstFrameFromBuffer() + } else { + break@loop + } + } + if (remaining > 0 && nextFrame == null) { + sink.complete() + } + } + + private fun decodeFirstFrameFromBuffer(): WireFrame? = + try { + WireFrame.decodeFirst(streamBuffer) + } catch (ex: MissingWireFrameBytesException) { + logger.debug { "${ex.message} - waiting for more data" } + null + } + + companion object { + private val logger = Logger(WireFrameSink::class) + } +} diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 8d9e4962..263ad441 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -20,8 +20,10 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString +import com.google.protobuf.InvalidProtocolBufferException import io.netty.buffer.Unpooled.wrappedBuffer import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it @@ -60,8 +62,9 @@ internal object VesDecoderTest : Spek({ on("invalid ves hv message bytes") { val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset())) - it("should return empty result") { - assertThat(cut.decode(rawMessageBytes)).isNull() + it("should throw error") { + assertThatExceptionOfType(InvalidProtocolBufferException::class.java) + .isThrownBy { cut.decode(rawMessageBytes) } } } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt deleted file mode 100644 index 81706ce4..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt +++ /dev/null @@ -1,104 +0,0 @@ -package org.onap.dcae.collectors.veshv.impl - -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrame - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since May 2018 - */ -internal object WireDecoderTest : Spek({ - describe("decoding wire protocol") { - val cut = WireDecoder() - - fun decode(frame: WireFrame) = - cut.decode( - frame.encode(UnpooledByteBufAllocator.DEFAULT)) - - given("empty input") { - val input = Unpooled.EMPTY_BUFFER - - it("should yield empty result") { - assertThat(cut.decode(input)).isNull() - } - } - - given("input without 0xFF first byte") { - val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, - mark = 0x10, - majorVersion = 1, - minorVersion = 2, - payloadSize = 0) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with unsupported major version") { - val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, - mark = 0xFF, - majorVersion = 100, - minorVersion = 2, - payloadSize = 0) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with too small payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = 1) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("input with too big payload size") { - val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = 8) - - it("should yield empty result") { - assertThat(decode(input)).isNull() - } - } - - given("valid input") { - val payload = byteArrayOf(6, 9, 8, 6) - val input = WireFrame( - payload = Unpooled.wrappedBuffer(payload), - mark = 0xFF, - majorVersion = 1, - minorVersion = 0, - payloadSize = payload.size) - - - it("should yield Google Protocol Buffers payload") { - val result = decode(input)!! - - val actualPayload = ByteArray(result.readableBytes()) - result.readBytes(actualPayload) - - assertThat(actualPayload).containsExactly(*payload) - } - } - } -}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt new file mode 100644 index 00000000..0a10aa1f --- /dev/null +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt @@ -0,0 +1,233 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.wire + +import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import reactor.test.test + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> + * @since May 2018 + */ +internal object WireDecoderTest : Spek({ + val alloc = UnpooledByteBufAllocator.DEFAULT + val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() + val anotherPayload = "ala ma kota a kot ma ale".toByteArray() + + fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc)) + + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should be released: $bb ref count") + .isEqualTo(0) + } + } + + fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { + for (bb in byteBuffers) { + assertThat(bb.refCnt()) + .describedAs("should not be released: $bb ref count") + .isEqualTo(1) + } + } + + describe("decoding wire protocol") { + given("empty input") { + val input = Unpooled.EMPTY_BUFFER + + it("should yield empty result") { + WireDecoder().decode(input).test().verifyComplete() + } + } + + given("input with no readable bytes") { + val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) + + it("should yield empty result") { + WireDecoder().decode(input).test().verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input) + } + } + + given("invalid input (not starting with marker)") { + val input = Unpooled.wrappedBuffer(samplePayload) + + it("should yield error") { + WireDecoder().decode(input).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input") { + val input = WireFrame(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + } + + given("valid input with part of next frame") { + val input = Unpooled.buffer() + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3)) + + it("should yield decoded input frame") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("valid input with garbage after it") { + val input = Unpooled.buffer() + .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) + .writeBytes(Unpooled.wrappedBuffer(samplePayload)) + + it("should yield decoded input frame and error") { + WireDecoder().decode(input).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should leave memory unreleased") { + verifyMemoryNotReleased(input) + } + } + + given("two inputs containing two separate messages") { + val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing 1st frame and 2nd input containing garbage") { + val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val input2 = Unpooled.wrappedBuffer(anotherPayload) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1) + .doOnNext { + // releasing retained payload + it.payload.release() + } + .test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .verifyError(InvalidWireFrameMarkerException::class.java) + } + + it("should release memory for 1st input") { + verifyMemoryReleased(input1) + } + + it("should leave memory unreleased for 2nd input") { + verifyMemoryNotReleased(input2) + } + } + + + given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { + val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + val input1 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2, 3) + val input2 = Unpooled.buffer().writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + + given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { + val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + + val input1 = Unpooled.buffer() + .writeBytes(frame1, 5) + val input2 = Unpooled.buffer() + .writeBytes(frame1) + .writeBytes(frame2) + + it("should yield decoded input frames") { + val cut = WireDecoder() + cut.decode(input1).test() + .verifyComplete() + cut.decode(input2).test() + .expectNextMatches { it.payloadSize == samplePayload.size } + .expectNextMatches { it.payloadSize == anotherPayload.size } + .verifyComplete() + } + + it("should release memory") { + verifyMemoryReleased(input1, input2) + } + } + } +}) diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback-test.xml index 809f62d4..84abc9d3 100644 --- a/hv-collector-core/src/test/resources/logback.xml +++ b/hv-collector-core/src/test/resources/logback-test.xml @@ -26,7 +26,7 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 1826bcd0..c4e9874f 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -20,12 +20,15 @@ package org.onap.dcae.collectors.veshv.tests.component import io.netty.buffer.ByteBuf +import io.netty.buffer.UnpooledByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import reactor.core.Exceptions import reactor.core.publisher.Flux import java.time.Duration @@ -36,6 +39,7 @@ import java.time.Duration internal class Sut { val configurationProvider = FakeConfigurationProvider() val sink = FakeSink() + val alloc = UnpooledByteBufAllocator.DEFAULT private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink)) val collectorProvider = collectorFactory.createVesHvCollectorProvider() @@ -43,8 +47,19 @@ internal class Sut { get() = collectorProvider() fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10)) - + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) return sink.sentMessages } + + fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> = + try { + collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10)) + Pair(sink.sentMessages, null) + } catch (ex: Exception) { + Pair(sink.sentMessages, ex) + } + + companion object { + val logger = Logger(Sut::class) + } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 26032ff9..fc4fb656 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -19,9 +19,12 @@ */ package org.onap.dcae.collectors.veshv.tests.component +import com.google.protobuf.InvalidProtocolBufferException import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -40,29 +43,76 @@ object VesHvSpecification : Spek({ .describedAs("should send all events") .hasSize(2) } + } + + describe("Memory management") { - system("should release memory for each incoming message") { sut -> + system("should release memory for each handled and dropped message") { sut -> sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) val msgWithInvalidDomain = vesMessage(Domain.OTHER) - val msgWithInvalidPayload = invalidVesMessage() val msgWithInvalidFrame = invalidWireFrame() - val validMessage = vesMessage(Domain.HVRANMEAS) - val refCntBeforeSending = msgWithInvalidDomain.refCnt() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithInvalidDomain, msgWithInvalidFrame) - sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage) + assertThat(handledEvents).hasSize(1) + assertThat(exception).isNull() + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidDomain.refCnt()) .describedAs("message with invalid domain should be released") - .isEqualTo(refCntBeforeSending) - assertThat(msgWithInvalidPayload.refCnt()) - .describedAs("message with invalid payload should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) assertThat(msgWithInvalidFrame.refCnt()) .describedAs("message with invalid frame should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) + + } + + system("should release memory for each message with invalid payload") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) + val msgWithInvalidPayload = invalidVesMessage() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithInvalidPayload) + + assertThat(handledEvents).hasSize(1) + assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java) + + assertThat(validMessage.refCnt()) + .describedAs("handled message should be released") + .isEqualTo(expectedRefCnt) + assertThat(msgWithInvalidPayload.refCnt()) + .describedAs("message with invalid payload should be released") + .isEqualTo(expectedRefCnt) + + } + + system("should release memory for each message with garbage frame") { sut -> + sut.configurationProvider.updateConfiguration(basicConfiguration) + val validMessage = vesMessage(Domain.HVRANMEAS) + val msgWithGarbageFrame = garbageFrame() + val expectedRefCnt = 0 + + val (handledEvents, exception) = sut.handleConnectionReturningError( + validMessage, msgWithGarbageFrame) + + assertThat(handledEvents).hasSize(1) + assertThat(exception?.cause) + .isInstanceOf(InvalidWireFrameMarkerException::class.java) + assertThat(validMessage.refCnt()) .describedAs("handled message should be released") - .isEqualTo(refCntBeforeSending) + .isEqualTo(expectedRefCnt) + assertThat(msgWithGarbageFrame.refCnt()) + .describedAs("message with garbage frame should be released") + .isEqualTo(expectedRefCnt) + } } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt index b6342b11..998f3140 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt @@ -54,6 +54,10 @@ fun invalidVesMessage() = alocator.buffer().run { } +fun garbageFrame() = alocator.buffer().run { + writeBytes("the meaning of life is &@)(*_!".toByteArray()) +} + fun invalidWireFrame() = alocator.buffer().run { writeByte(0xFF) writeByte(1) @@ -65,6 +69,7 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr .setCommonEventHeader( CommonEventHeader.getDefaultInstance().toBuilder() .setVersion("1.0") + .setEventName("xyz") .setEventId(id) .setDomain(domain) .setEventName("Sample event name") @@ -76,6 +81,3 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr .setSequence(1)) .setHvRanMeasFields(ByteString.EMPTY) .build() - - - diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml index 809f62d4..84abc9d3 100644 --- a/hv-collector-ct/src/test/resources/logback-test.xml +++ b/hv-collector-ct/src/test/resources/logback-test.xml @@ -26,7 +26,7 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 5bd63d8b..8c8b4718 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.domain import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). @@ -53,20 +56,20 @@ import io.netty.buffer.ByteBufAllocator * @since May 2018 */ data class WireFrame(val payload: ByteBuf, - val mark: Short, val majorVersion: Short, val minorVersion: Short, val payloadSize: Int) { + constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes()) + fun isValid(): Boolean = - mark == FF_BYTE - && majorVersion == SUPPORTED_MAJOR_VERSION + majorVersion == SUPPORTED_MAJOR_VERSION && payload.readableBytes() == payloadSize fun encode(allocator: ByteBufAllocator): ByteBuf { val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) - bb.writeByte(mark.toInt()) + bb.writeByte(MARKER_BYTE.toInt()) bb.writeByte(majorVersion.toInt()) bb.writeByte(minorVersion.toInt()) bb.writeInt(payloadSize) @@ -76,20 +79,58 @@ data class WireFrame(val payload: ByteBuf, } companion object { - fun decode(byteBuf: ByteBuf): WireFrame { - val mark = byteBuf.readUnsignedByte() + fun decodeFirst(byteBuf: ByteBuf): WireFrame { + verifyNotEmpty(byteBuf) + byteBuf.markReaderIndex() + + verifyMarker(byteBuf) + verifyMinimumSize(byteBuf) + val majorVersion = byteBuf.readUnsignedByte() val minorVersion = byteBuf.readUnsignedByte() - val payloadSize = byteBuf.readInt() - val payload = byteBuf.retainedSlice() + val payloadSize = verifyPayloadSize(byteBuf) + + val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize) + byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize) + + return WireFrame(payload, majorVersion, minorVersion, payloadSize) + } + + private fun verifyPayloadSize(byteBuf: ByteBuf): Int = + byteBuf.readInt().let { payloadSize -> + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < payload size") + } else { + payloadSize + } + } + + private fun verifyMinimumSize(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < HEADER_SIZE) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < header size") + } + } + + private fun verifyMarker(byteBuf: ByteBuf) { + val mark = byteBuf.readUnsignedByte() + if (mark != MARKER_BYTE) { + byteBuf.resetReaderIndex() + throw InvalidWireFrameMarkerException(mark) + } + } - return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize) + private fun verifyNotEmpty(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < 1) { + throw EmptyWireFrameException() + } } - private const val HEADER_SIZE = + const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + - 1 * java.lang.Integer.BYTES - private const val FF_BYTE: Short = 0xFF - private const val SUPPORTED_MAJOR_VERSION: Short = 1 + 1 * java.lang.Integer.BYTES + const val MARKER_BYTE: Short = 0xFF + const val SUPPORTED_MAJOR_VERSION: Short = 1 } } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt new file mode 100644 index 00000000..6e1ce935 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt index 6f6ac2a7..ff452a7a 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt @@ -17,28 +17,13 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl +package org.onap.dcae.collectors.veshv.domain.exceptions -import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.utils.logging.Logger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 + * @since June 2018 */ -internal class WireDecoder { - fun decode(byteBuf: ByteBuf): ByteBuf? = - try { - WireFrame.decode(byteBuf) - .takeIf { it.isValid() } - .let { it?.payload } - } catch (ex: IndexOutOfBoundsException) { - logger.debug { "Wire protocol frame could not be decoded - input is too small" } - null - } - - companion object { - private val logger = Logger(WireDecoder::class) - } -} +class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException( + "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker)) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt new file mode 100644 index 00000000..7e4b3cef --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt new file mode 100644 index 00000000..11013834 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt @@ -0,0 +1,26 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.exceptions + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +open class WireFrameDecodingException(msg: String) : Exception(msg) diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt index 5a923c4e..00113267 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt @@ -1,29 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.domain -import io.netty.buffer.ByteBufAllocator import io.netty.buffer.Unpooled +import io.netty.buffer.UnpooledByteBufAllocator import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException +import java.nio.charset.Charset /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ object WireFrameTest : Spek({ - describe("Wire Frame codec") { - describe("encode-decode methods' compatibility") { - val payloadContent = "test" - val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII)) - val frame = WireFrame(payload = payload, - majorVersion = 1, + val payloadAsString = "coffeebabe" + + fun createSampleFrame() = + WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset()))) + + fun encodeSampleFrame() = + createSampleFrame().let { + Unpooled.buffer() + .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT)) + + } + + describe("Wire Frame invariants") { + + given("input with unsupported major version") { + val input = WireFrame( + payload = Unpooled.EMPTY_BUFFER, + majorVersion = 100, minorVersion = 2, - mark = 0xFF, - payloadSize = payload.readableBytes()) + payloadSize = 0) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too small payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 1) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("input with too big payload size") { + val input = WireFrame( + payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + majorVersion = 1, + minorVersion = 0, + payloadSize = 8) + + it("should fail validation") { + assertThat(input.isValid()).isFalse() + } + } + + given("valid input") { + val payload = byteArrayOf(6, 9, 8, 6) + val input = WireFrame( + payload = Unpooled.wrappedBuffer(payload), + majorVersion = 1, + minorVersion = 0, + payloadSize = payload.size) + + it("should pass validation") { + assertThat(input.isValid()).isTrue() + } + } - val encoded = frame.encode(ByteBufAllocator.DEFAULT) - val decoded = WireFrame.decode(encoded) + + } + + describe("Wire Frame codec") { + + describe("encode-decode methods' compatibility") { + val frame = createSampleFrame() + val encoded = encodeSampleFrame() + val decoded = WireFrame.decodeFirst(encoded) it("should decode major version") { assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) @@ -33,17 +117,13 @@ object WireFrameTest : Spek({ assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion) } - it("should decode mark") { - assertThat(decoded.mark).isEqualTo(frame.mark) - } - it("should decode payload size") { assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize) } it("should decode payload") { - assertThat(decoded.payload.toString(Charsets.US_ASCII)) - .isEqualTo(payloadContent) + assertThat(decoded.payload.toString(Charset.defaultCharset())) + .isEqualTo(payloadAsString) } it("should retain decoded payload") { @@ -51,5 +131,55 @@ object WireFrameTest : Spek({ assertThat(decoded.payload.refCnt()).isEqualTo(1) } } + + describe("TCP framing") { + // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11 + + it("should decode message leaving rest unread") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + .writeByte(0xAA) + val decoded = WireFrame.decodeFirst(buff) + + assertThat(decoded.isValid()).describedAs("should be valid").isTrue() + assertThat(buff.readableBytes()).isEqualTo(1) + } + + it("should throw exception when not even header fits") { + val buff = Unpooled.buffer() + .writeByte(0xFF) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF but length looks ok") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + .writeBytes("some garbage".toByteArray()) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when first byte is not 0xFF and length is to short") { + val buff = Unpooled.buffer() + .writeByte(0xAA) + + assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + it("should throw exception when payload doesn't fit") { + val buff = Unpooled.buffer() + .writeBytes(encodeSampleFrame()) + buff.writerIndex(buff.writerIndex() - 2) + + assertThatExceptionOfType(MissingWireFrameBytesException::class.java) + .isThrownBy { WireFrame.decodeFirst(buff) } + } + + } } + })
\ No newline at end of file diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile index ceb45ead..1367ff1c 100644 --- a/hv-collector-main/Dockerfile +++ b/hv-collector-main/Dockerfile @@ -12,4 +12,4 @@ ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"] CMD ["--listen-port", "6061"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ -COPY target/hv-collector-main-*.jar ./
\ No newline at end of file +COPY target/hv-collector-main-*.jar ./ diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 89b31b59..4438cf38 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -39,7 +39,7 @@ fun main(args: Array<String>) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.kafkaSink() + AdapterFactory.loggingSink() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml index 809f62d4..48da3b18 100644 --- a/hv-collector-main/src/main/resources/logback.xml +++ b/hv-collector-main/src/main/resources/logback.xml @@ -26,7 +26,8 @@ </rollingPolicy> </appender> - <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/> + <logger name="org.onap.dcae.collectors.veshv" level="INFO"/> + <!--<logger name="reactor.ipc.netty" level="DEBUG"/>--> <root level="INFO"> <appender-ref ref="CONSOLE"/> diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index b96a8b3a..eb52a866 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -26,6 +26,17 @@ class Logger(val logger: org.slf4j.Logger) { constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java)) // + // TRACE + // + + fun trace(messageProvider: () -> String) { + if (logger.isTraceEnabled) { + logger.trace(messageProvider()) + } + } + + + // // DEBUG // |