diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-06-08 16:29:31 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-08-02 07:06:19 +0200 |
commit | 7c3b59560f015b65882a56db585b7d4bdd10d434 (patch) | |
tree | 4c15d3657e373d3a681fdd2ab865623aeecc82e7 | |
parent | 07bbbf71cd65b29f446a1b475add87f20365db83 (diff) |
Implement Kafka Sink
Closes ONAP-146
Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Issue-ID: DCAEGEN2-601
28 files changed, 385 insertions, 208 deletions
diff --git a/docker-compose.yml b/docker-compose.yml index af8e0e0e..0f0cca2d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,14 +9,15 @@ services: ports: - "9092:9092" environment: - HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'" + KAFKA_ADVERTISED_HOST_NAME: "kafka" KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092" volumes: - /var/run/docker.sock:/var/run/docker.sock depends_on: - zookeeper - hv-collector: + veshvcollector: build: context: hv-collector-main dockerfile: Dockerfile @@ -26,11 +27,11 @@ services: - kafka volumes: - ./ssl/:/etc/ves-hv/ - xnf-simulator: + xnfsimulator: build: context: hv-collector-client-simulator dockerfile: Dockerfile depends_on: - - hv-collector + - veshvcollector volumes: - ./ssl/:/etc/ves-hv/
\ No newline at end of file diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile index 19c4c878..58cfa448 100644 --- a/hv-collector-client-simulator/Dockerfile +++ b/hv-collector-client-simulator/Dockerfile @@ -7,7 +7,7 @@ LABEL maintainer="Nokia Wroclaw ONAP Team" WORKDIR /opt/ves-hv-client-simulator ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"] -CMD ["--ves-host", "hv-collector", "--ves-port", "6061"] +CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"] COPY target/libs/external/* ./ COPY target/libs/internal/* ./ COPY target/hv-collector-client-simulator-*.jar ./ diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt index d5f7c7c8..87a238a8 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt @@ -20,8 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled import org.onap.dcae.collectors.veshv.domain.WireFrame import org.onap.ves.VesEventV5 import reactor.core.publisher.Flux @@ -65,12 +63,12 @@ class MessageFactory { return WireFrame(payload) } - private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf { + private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray { val msg = VesEventV5.VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() - return Unpooled.wrappedBuffer(msg.toByteArray()) + return msg.toByteArray() } } diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt index 29573e86..cb56db91 100644 --- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt +++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt @@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslProvider import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) { .asString(Charsets.UTF_8) .subscribe { str -> logger.info("Server response: $str") } + val encoder = WireFrameEncoder(nettyOutbound.alloc()) + val frames = messages - .map { it.encode(nettyOutbound.alloc()) } + .map(encoder::encode) .concatWith(Mono.just(Unpooled.EMPTY_BUFFER)) return nettyOutbound diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index 913d8f50..73f4d09d 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorProvider import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.MessageValidator import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import reactor.core.publisher.Flux import java.util.concurrent.atomic.AtomicReference @@ -36,7 +37,8 @@ import java.util.concurrent.atomic.AtomicReference * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) { +class CollectorFactory(val configuration: ConfigurationProvider, + private val sinkProvider: SinkProvider) { fun createVesHvCollectorProvider(): CollectorProvider { val collector: AtomicReference<Collector> = AtomicReference() createVesHvCollector().subscribe(collector::set) @@ -48,7 +50,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - { WireDecoder(it) }, + { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, VesDecoder(), MessageValidator(), Router(config.routing), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt index 60e7d70a..591a48b7 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt @@ -19,9 +19,8 @@ */ package org.onap.dcae.collectors.veshv.impl -import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.ves.VesEventV5.VesEvent /** @@ -30,8 +29,8 @@ import org.onap.ves.VesEventV5.VesEvent */ internal class VesDecoder { - fun decode(bb: ByteBuf): VesMessage { - val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader - return VesMessage(decodedHeader, bb) + fun decode(bytes: ByteData): VesMessage { + val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader + return VesMessage(decodedHeader, bytes) } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index ac11b3e8..965943f6 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -24,57 +24,42 @@ import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.WireFrame -import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder +import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder import org.onap.dcae.collectors.veshv.model.RoutedMessage import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.util.concurrent.atomic.AtomicInteger /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ internal class VesHvCollector( - private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder, + private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, private val protobufDecoder: VesDecoder, private val validator: MessageValidator, private val router: Router, private val sink: Sink) : Collector { override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireDecoderSupplier(alloc).let { wireDecoder -> + wireChunkDecoderSupplier(alloc).let { wireDecoder -> dataStream .concatMap(wireDecoder::decode) .filter(WireFrame::isValid) .map(WireFrame::payload) .map(protobufDecoder::decode) - .filter(this::validate) + .filter(validator::isValid) .flatMap(this::findRoute) .compose(sink::send) - .doOnNext(this::releaseMemory) .doOnTerminate { releaseBuffersMemory(wireDecoder) } .then() } - private fun validate(msg: VesMessage): Boolean { - val valid = validator.isValid(msg) - if (!valid) { - msg.rawMessage.release() - } - return valid - } - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination) - private fun releaseMemory(msg: VesMessage) { - logger.trace { "Releasing memory from ${msg.rawMessage}" } - msg.rawMessage.release() - } - - private fun releaseBuffersMemory(wireDecoder: WireDecoder) { - wireDecoder.release() + private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) { + wireChunkDecoder.release() } private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input)) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt index 8e6db2af..358be108 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider import org.onap.dcae.collectors.veshv.boundary.SinkProvider +import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration import reactor.core.publisher.Flux import reactor.ipc.netty.http.client.HttpClient diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt index 62b6d1aa..b943e4e5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt @@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider { private fun logMessage(msg: RoutedMessage) { val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong()) + val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } if (msgs % INFO_LOGGING_FREQ == 0L) logger.info(logMessageSupplier) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt index db7845c7..6142fa3c 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.model.RoutedMessage @@ -28,13 +28,12 @@ import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderRecord import reactor.kafka.sender.SenderResult -import java.nio.ByteBuffer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink { +internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> { val records = messages.map(this::vesToKafkaRecord) @@ -44,13 +43,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, Byte .map { it.correlationMetadata() } } - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> { + private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> { return SenderRecord.create( msg.topic, msg.partition, System.currentTimeMillis(), msg.message.header, - msg.message.rawMessage.nioBuffer(), + msg.message, msg.message) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt index 82452e1e..a00a02d2 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt @@ -17,18 +17,16 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.ByteBufferSerializer -import org.apache.kafka.common.serialization.StringSerializer import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkProvider import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.ves.VesEventV5 +import org.onap.dcae.collectors.veshv.model.VesMessage +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import reactor.kafka.sender.KafkaSender import reactor.kafka.sender.SenderOptions -import java.nio.ByteBuffer /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -40,9 +38,8 @@ internal class KafkaSinkProvider : SinkProvider { } private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>() + SenderOptions.create<CommonEventHeader, VesMessage>() .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java) - + .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) + .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt new file mode 100644 index 00000000..9753d9e5 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt @@ -0,0 +1,40 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import com.google.protobuf.MessageLite +import org.apache.kafka.common.serialization.Serializer + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class ProtobufSerializer :Serializer<MessageLite> { + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + // no configuration + } + + override fun serialize(topic: String?, data: MessageLite?): ByteArray? = + data?.toByteArray() + + override fun close() { + // cleanup not needed + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt new file mode 100644 index 00000000..7a6ac7c8 --- /dev/null +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt @@ -0,0 +1,37 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.impl.adapters.kafka + +import org.apache.kafka.common.serialization.Serializer +import org.onap.dcae.collectors.veshv.model.VesMessage + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class VesMessageSerializer : Serializer<VesMessage> { + override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { + } + + override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() + + override fun close() { + } +} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt index e4dd7cf6..34a8b928 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf import io.netty.buffer.CompositeByteBuf import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.FluxSink @@ -33,6 +34,7 @@ import java.util.function.Consumer * @since May 2018 */ internal class StreamBufferEmitter( + private val decoder: WireFrameDecoder, private val streamBuffer: CompositeByteBuf, private val newFrame: ByteBuf) : Consumer<FluxSink<WireFrame>> { @@ -58,15 +60,15 @@ internal class StreamBufferEmitter( streamBuffer.discardReadComponents() } sink.onRequest { requestedFrameCount -> - WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber() + WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber() } } } } companion object { - fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> = - Flux.create(StreamBufferEmitter(streamBuffer, newFrame)) + fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> = + Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame)) private const val INCREASE_WRITER_INDEX = true private val logger = Logger(StreamBufferEmitter::class) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index b701aaf2..580d36c5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.impl.VesHvCollector import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux @@ -30,10 +31,10 @@ import reactor.core.publisher.Flux * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { +internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { private val streamBuffer = alloc.compositeBuffer() - fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf) + fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf) .doOnSubscribe { logIncomingMessage(byteBuf) } .doOnNext(this::logDecodedWireMessage) @@ -41,7 +42,6 @@ internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { streamBuffer.release() } - private fun logIncomingMessage(wire: ByteBuf) { logger.trace { "Got message with total size of ${wire.readableBytes()} B" } } diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt index bc9c8389..a576dc65 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.wire import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.FluxSink @@ -30,6 +31,7 @@ import reactor.core.publisher.FluxSink * @since May 2018 */ internal class WireFrameSink( + private val decoder: WireFrameDecoder, private val streamBuffer: ByteBuf, private val sink: FluxSink<WireFrame>, private val requestedFrameCount: Long) { @@ -80,7 +82,7 @@ internal class WireFrameSink( private fun decodeFirstFrameFromBuffer(): WireFrame? = try { - WireFrame.decodeFirst(streamBuffer) + decoder.decodeFirst(streamBuffer) } catch (ex: MissingWireFrameBytesException) { logger.debug { "${ex.message} - waiting for more data" } null diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt index 38256896..03c53e10 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt @@ -19,11 +19,11 @@ */ package org.onap.dcae.collectors.veshv.model -import io.netty.buffer.ByteBuf +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf) +data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt index 10e79156..bc030587 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt @@ -73,7 +73,7 @@ class RouteBuilder { this.targetTopic = targetTopic } - fun withFixedPartitioning(num: Int = 1) { + fun withFixedPartitioning(num: Int = 0) { partitioning = { _ -> num } } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt index df2840b9..017187a4 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt @@ -20,27 +20,29 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import io.netty.buffer.Unpooled.wrappedBuffer +import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.toByteData import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader import org.onap.ves.VesEventV5.VesEvent -import org.assertj.core.api.Assertions.assertThat -import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.* +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance +import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder internal object MessageValidatorTest : Spek({ - fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf { + fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData { val msg = VesEvent.newBuilder() .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data")) .build() - return wrappedBuffer(msg.toByteArray()) + return msg.toByteData() } given("Message validator") { @@ -79,7 +81,7 @@ internal object MessageValidatorTest : Spek({ } on("ves hv message bytes") { - val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER) + val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) it("should not accept message with default header") { assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() } @@ -97,7 +99,7 @@ internal object MessageValidatorTest : Spek({ .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!")) .build() - val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + val rawMessageBytes = msg.toByteData() it("should not accept not fully initialized message header ") { val vesMessage = VesMessage(commonHeader, rawMessageBytes) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 3812db58..c852f5f4 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -1,11 +1,30 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ package org.onap.dcae.collectors.veshv.impl -import io.netty.buffer.Unpooled import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -34,7 +53,7 @@ object RouterTest : Spek({ val cut = Router(config) on("message with existing route (rtpm)") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -55,7 +74,7 @@ object RouterTest : Spek({ } on("message with existing route (trace)") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY) val result = cut.findDestination(message) it("should have route available") { @@ -63,7 +82,7 @@ object RouterTest : Spek({ } it("should be routed to proper partition") { - assertThat(result?.partition).isEqualTo(1) + assertThat(result?.partition).isEqualTo(0) } it("should be routed to proper topic") { @@ -76,7 +95,7 @@ object RouterTest : Spek({ } on("message with unknown route") { - val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER) + val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY) val result = cut.findDestination(message) it("should not have route available") { diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt index 263ad441..90b34b1c 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt @@ -21,13 +21,14 @@ package org.onap.dcae.collectors.veshv.impl import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException -import io.netty.buffer.Unpooled.wrappedBuffer import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.assertThatExceptionOfType import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.domain.ByteData +import org.onap.dcae.collectors.veshv.domain.toByteData import org.onap.dcae.collectors.veshv.model.VesMessage import org.onap.ves.VesEventV5.VesEvent import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader @@ -45,7 +46,7 @@ internal object VesDecoderTest : Spek({ .setCommonEventHeader(commonHeader) .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements")) .build() - val rawMessageBytes = wrappedBuffer(msg.toByteArray()) + val rawMessageBytes = msg.toByteData() it("should decode only header and pass it on along with raw message") { @@ -60,7 +61,7 @@ internal object VesDecoderTest : Spek({ } on("invalid ves hv message bytes") { - val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset())) + val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) it("should throw error") { assertThatExceptionOfType(InvalidProtocolBufferException::class.java) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index 0a10aa1f..1ddcc3dc 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -28,6 +28,8 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrame +import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder +import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException import reactor.test.test @@ -35,13 +37,17 @@ import reactor.test.test * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> * @since May 2018 */ -internal object WireDecoderTest : Spek({ +internal object WireChunkDecoderTest : Spek({ val alloc = UnpooledByteBufAllocator.DEFAULT val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() val anotherPayload = "ala ma kota a kot ma ale".toByteArray() - fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc)) + val encoder = WireFrameEncoder(alloc) + + fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame)) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) + fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { assertThat(bb.refCnt()) @@ -63,7 +69,7 @@ internal object WireDecoderTest : Spek({ val input = Unpooled.EMPTY_BUFFER it("should yield empty result") { - WireDecoder().decode(input).test().verifyComplete() + createInstance().decode(input).test().verifyComplete() } } @@ -71,7 +77,7 @@ internal object WireDecoderTest : Spek({ val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) it("should yield empty result") { - WireDecoder().decode(input).test().verifyComplete() + createInstance().decode(input).test().verifyComplete() } it("should release memory") { @@ -83,7 +89,7 @@ internal object WireDecoderTest : Spek({ val input = Unpooled.wrappedBuffer(samplePayload) it("should yield error") { - WireDecoder().decode(input).test() + createInstance().decode(input).test() .verifyError(InvalidWireFrameMarkerException::class.java) } @@ -93,10 +99,10 @@ internal object WireDecoderTest : Spek({ } given("valid input") { - val input = WireFrame(Unpooled.wrappedBuffer(samplePayload)) + val input = WireFrame(samplePayload) it("should yield decoded input frame") { - WireDecoder().decode(input).test() + createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } @@ -104,11 +110,11 @@ internal object WireDecoderTest : Spek({ given("valid input with part of next frame") { val input = Unpooled.buffer() - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3)) + .writeBytes(encoder.encode(WireFrame(samplePayload))) + .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3)) it("should yield decoded input frame") { - WireDecoder().decode(input).test() + createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() } @@ -120,11 +126,11 @@ internal object WireDecoderTest : Spek({ given("valid input with garbage after it") { val input = Unpooled.buffer() - .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)) + .writeBytes(encoder.encode(WireFrame(samplePayload))) .writeBytes(Unpooled.wrappedBuffer(samplePayload)) it("should yield decoded input frame and error") { - WireDecoder().decode(input).test() + createInstance().decode(input).test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyError(InvalidWireFrameMarkerException::class.java) } @@ -135,11 +141,11 @@ internal object WireDecoderTest : Spek({ } given("two inputs containing two separate messages") { - val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + val input1 = encoder.encode(WireFrame(samplePayload)) + val input2 = encoder.encode(WireFrame(anotherPayload)) it("should yield decoded input frames") { - val cut = WireDecoder() + val cut = createInstance() cut.decode(input1).test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() @@ -154,16 +160,12 @@ internal object WireDecoderTest : Spek({ } given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) + val input1 = encoder.encode(WireFrame(samplePayload)) val input2 = Unpooled.wrappedBuffer(anotherPayload) it("should yield decoded input frames") { - val cut = WireDecoder() + val cut = createInstance() cut.decode(input1) - .doOnNext { - // releasing retained payload - it.payload.release() - } .test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() @@ -182,8 +184,8 @@ internal object WireDecoderTest : Spek({ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + val frame1 = encoder.encode(WireFrame(samplePayload)) + val frame2 = encoder.encode(WireFrame(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1) @@ -191,7 +193,7 @@ internal object WireDecoderTest : Spek({ val input2 = Unpooled.buffer().writeBytes(frame2) it("should yield decoded input frames") { - val cut = WireDecoder() + val cut = createInstance() cut.decode(input1).test() .expectNextMatches { it.payloadSize == samplePayload.size } .verifyComplete() @@ -206,8 +208,8 @@ internal object WireDecoderTest : Spek({ } given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc) - val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc) + val frame1 = encoder.encode(WireFrame(samplePayload)) + val frame2 = encoder.encode(WireFrame(anotherPayload)) val input1 = Unpooled.buffer() .writeBytes(frame1, 5) @@ -216,7 +218,7 @@ internal object WireDecoderTest : Spek({ .writeBytes(frame2) it("should yield decoded input frames") { - val cut = WireDecoder() + val cut = createInstance() cut.decode(input1).test() .verifyComplete() cut.decode(input2).test() diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index fc4fb656..08b6382d 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -24,7 +24,6 @@ import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain @@ -125,7 +124,7 @@ object VesHvSpecification : Spek({ val msg = messages[0] assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC) - assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1) + assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0) } system("should drop message if route was not found") { sut -> diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt new file mode 100644 index 00000000..2b84e3f1 --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt @@ -0,0 +1,58 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import com.google.protobuf.MessageLite +import io.netty.buffer.ByteBuf +import java.nio.charset.Charset + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class ByteData(private val data: ByteArray) { + + fun size() = data.size + + /** + * This will expose mutable state of the data. + * + * @return wrapped data buffer (NOT a copy) + */ + fun unsafeAsArray() = data + + fun writeTo(byteBuf: ByteBuf) { + byteBuf.writeBytes(data) + } + + fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset) + + companion object { + val EMPTY = ByteData(byteArrayOf()) + + fun readFrom(byteBuf: ByteBuf, length: Int): ByteData { + val dataArray = ByteArray(length) + byteBuf.readBytes(dataArray) + return ByteData(dataArray) + } + } +} + +fun MessageLite.toByteData(): ByteData = ByteData(toByteArray()) diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt index 8c8b4718..caf13c53 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt @@ -19,12 +19,6 @@ */ package org.onap.dcae.collectors.veshv.domain -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException -import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException -import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException - /** * Wire frame structure is presented bellow. All fields are in network byte order (big-endian). * @@ -55,82 +49,25 @@ import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesExc * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -data class WireFrame(val payload: ByteBuf, +data class WireFrame(val payload: ByteData, val majorVersion: Short, val minorVersion: Short, val payloadSize: Int) { - constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes()) + constructor(payload: ByteArray) : this(ByteData(payload), 1, 0, payload.size) fun isValid(): Boolean = majorVersion == SUPPORTED_MAJOR_VERSION - && payload.readableBytes() == payloadSize - - fun encode(allocator: ByteBufAllocator): ByteBuf { - val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes()) - - bb.writeByte(MARKER_BYTE.toInt()) - bb.writeByte(majorVersion.toInt()) - bb.writeByte(minorVersion.toInt()) - bb.writeInt(payloadSize) - bb.writeBytes(payload) - - return bb - } + && payload.size() == payloadSize companion object { - fun decodeFirst(byteBuf: ByteBuf): WireFrame { - verifyNotEmpty(byteBuf) - byteBuf.markReaderIndex() - - verifyMarker(byteBuf) - verifyMinimumSize(byteBuf) - - val majorVersion = byteBuf.readUnsignedByte() - val minorVersion = byteBuf.readUnsignedByte() - val payloadSize = verifyPayloadSize(byteBuf) - - val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize) - byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize) - - return WireFrame(payload, majorVersion, minorVersion, payloadSize) - } - - private fun verifyPayloadSize(byteBuf: ByteBuf): Int = - byteBuf.readInt().let { payloadSize -> - if (byteBuf.readableBytes() < payloadSize) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < payload size") - } else { - payloadSize - } - } - - private fun verifyMinimumSize(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < HEADER_SIZE) { - byteBuf.resetReaderIndex() - throw MissingWireFrameBytesException("readable bytes < header size") - } - } - - private fun verifyMarker(byteBuf: ByteBuf) { - val mark = byteBuf.readUnsignedByte() - if (mark != MARKER_BYTE) { - byteBuf.resetReaderIndex() - throw InvalidWireFrameMarkerException(mark) - } - } - - private fun verifyNotEmpty(byteBuf: ByteBuf) { - if (byteBuf.readableBytes() < 1) { - throw EmptyWireFrameException() - } - } + const val SUPPORTED_MAJOR_VERSION: Short = 1 const val HEADER_SIZE = 3 * java.lang.Byte.BYTES + 1 * java.lang.Integer.BYTES const val MARKER_BYTE: Short = 0xFF - const val SUPPORTED_MAJOR_VERSION: Short = 1 + } + } diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt new file mode 100644 index 00000000..d6804c7d --- /dev/null +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -0,0 +1,98 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain + +import io.netty.buffer.ByteBuf +import io.netty.buffer.ByteBufAllocator +import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException +import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException +import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameEncoder(val allocator: ByteBufAllocator) { + + fun encode(frame: WireFrame): ByteBuf { + val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size()) + + bb.writeByte(WireFrame.MARKER_BYTE.toInt()) + bb.writeByte(frame.majorVersion.toInt()) + bb.writeByte(frame.minorVersion.toInt()) + bb.writeInt(frame.payloadSize) + frame.payload.writeTo(bb) + + return bb + } +} + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since June 2018 + */ +class WireFrameDecoder { + + fun decodeFirst(byteBuf: ByteBuf): WireFrame { + verifyNotEmpty(byteBuf) + byteBuf.markReaderIndex() + + verifyMarker(byteBuf) + verifyMinimumSize(byteBuf) + + val majorVersion = byteBuf.readUnsignedByte() + val minorVersion = byteBuf.readUnsignedByte() + val payloadSize = verifyPayloadSize(byteBuf) + val payload = ByteData.readFrom(byteBuf, payloadSize) + + return WireFrame(payload, majorVersion, minorVersion, payloadSize) + } + + private fun verifyPayloadSize(byteBuf: ByteBuf): Int = + byteBuf.readInt().let { payloadSize -> + if (byteBuf.readableBytes() < payloadSize) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < payload size") + } else { + payloadSize + } + } + + private fun verifyMinimumSize(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) { + byteBuf.resetReaderIndex() + throw MissingWireFrameBytesException("readable bytes < header size") + } + } + + private fun verifyMarker(byteBuf: ByteBuf) { + val mark = byteBuf.readUnsignedByte() + if (mark != WireFrame.MARKER_BYTE) { + byteBuf.resetReaderIndex() + throw InvalidWireFrameMarkerException(mark) + } + } + + private fun verifyNotEmpty(byteBuf: ByteBuf) { + if (byteBuf.readableBytes() < 1) { + throw EmptyWireFrameException() + } + } +} diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 00113267..ed64f3b3 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -35,24 +35,24 @@ import java.nio.charset.Charset * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -object WireFrameTest : Spek({ +object WireFrameCodecsTest : Spek({ val payloadAsString = "coffeebabe" + val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT) + val decoder = WireFrameDecoder() fun createSampleFrame() = - WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset()))) + WireFrame(payloadAsString.toByteArray(Charset.defaultCharset())) fun encodeSampleFrame() = createSampleFrame().let { - Unpooled.buffer() - .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT)) - + encoder.encode(it) } describe("Wire Frame invariants") { given("input with unsupported major version") { val input = WireFrame( - payload = Unpooled.EMPTY_BUFFER, + payload = ByteData.EMPTY, majorVersion = 100, minorVersion = 2, payloadSize = 0) @@ -64,7 +64,7 @@ object WireFrameTest : Spek({ given("input with too small payload size") { val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + payload = ByteData(byteArrayOf(1, 2, 3)), majorVersion = 1, minorVersion = 0, payloadSize = 1) @@ -76,7 +76,7 @@ object WireFrameTest : Spek({ given("input with too big payload size") { val input = WireFrame( - payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)), + payload = ByteData(byteArrayOf(1, 2, 3)), majorVersion = 1, minorVersion = 0, payloadSize = 8) @@ -89,7 +89,7 @@ object WireFrameTest : Spek({ given("valid input") { val payload = byteArrayOf(6, 9, 8, 6) val input = WireFrame( - payload = Unpooled.wrappedBuffer(payload), + payload = ByteData(payload), majorVersion = 1, minorVersion = 0, payloadSize = payload.size) @@ -107,7 +107,7 @@ object WireFrameTest : Spek({ describe("encode-decode methods' compatibility") { val frame = createSampleFrame() val encoded = encodeSampleFrame() - val decoded = WireFrame.decodeFirst(encoded) + val decoded = decoder.decodeFirst(encoded) it("should decode major version") { assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion) @@ -122,14 +122,9 @@ object WireFrameTest : Spek({ } it("should decode payload") { - assertThat(decoded.payload.toString(Charset.defaultCharset())) + assertThat(decoded.payload.asString()) .isEqualTo(payloadAsString) } - - it("should retain decoded payload") { - encoded.release() - assertThat(decoded.payload.refCnt()).isEqualTo(1) - } } describe("TCP framing") { @@ -139,7 +134,7 @@ object WireFrameTest : Spek({ val buff = Unpooled.buffer() .writeBytes(encodeSampleFrame()) .writeByte(0xAA) - val decoded = WireFrame.decodeFirst(buff) + val decoded = decoder.decodeFirst(buff) assertThat(decoded.isValid()).describedAs("should be valid").isTrue() assertThat(buff.readableBytes()).isEqualTo(1) @@ -150,7 +145,7 @@ object WireFrameTest : Spek({ .writeByte(0xFF) assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } + .isThrownBy { decoder.decodeFirst(buff) } } it("should throw exception when first byte is not 0xFF but length looks ok") { @@ -159,7 +154,7 @@ object WireFrameTest : Spek({ .writeBytes("some garbage".toByteArray()) assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } + .isThrownBy { decoder.decodeFirst(buff) } } it("should throw exception when first byte is not 0xFF and length is to short") { @@ -167,7 +162,7 @@ object WireFrameTest : Spek({ .writeByte(0xAA) assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } + .isThrownBy { decoder.decodeFirst(buff) } } it("should throw exception when payload doesn't fit") { @@ -176,7 +171,7 @@ object WireFrameTest : Spek({ buff.writerIndex(buff.writerIndex() - 2) assertThatExceptionOfType(MissingWireFrameBytesException::class.java) - .isThrownBy { WireFrame.decodeFirst(buff) } + .isThrownBy { decoder.decodeFirst(buff) } } } diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 4438cf38..b2f4633a 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -20,13 +20,13 @@ package org.onap.dcae.collectors.veshv.main import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.model.routing import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException +import org.onap.dcae.collectors.veshv.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.model.routing import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain import org.slf4j.LoggerFactory import kotlin.system.exitProcess @@ -39,7 +39,7 @@ fun main(args: Array<String>) { val collectorProvider = CollectorFactory( resolveConfigurationProvider(serverConfiguration), - AdapterFactory.loggingSink() + AdapterFactory.kafkaSink() ).createVesHvCollectorProvider() ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block() } catch (ex: WrongArgumentException) { @@ -55,7 +55,7 @@ private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguratio if (serverConfiguration.configurationUrl.isEmpty()) { logger.info("Configuration url not specified - using default config") val sampleConfig = CollectorConfiguration( - kafkaBootstrapServers = "dmaap.cluster.local:9969", + kafkaBootstrapServers = "kafka:9092", routing = routing { defineRoute { fromDomain(Domain.HVRANMEAS) |