summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-08 16:29:31 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-02 07:06:19 +0200
commit7c3b59560f015b65882a56db585b7d4bdd10d434 (patch)
tree4c15d3657e373d3a681fdd2ab865623aeecc82e7
parent07bbbf71cd65b29f446a1b475add87f20365db83 (diff)
Implement Kafka Sink
Closes ONAP-146 Change-Id: I119a8abe70a9042f65a43909e5aa2fbed439e26f Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--docker-compose.yml11
-rw-r--r--hv-collector-client-simulator/Dockerfile2
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt6
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt10
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt27
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt1
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt2
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt (renamed from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt)9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt (renamed from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt)15
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt8
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt (renamed from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt)6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt2
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt22
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt29
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt7
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt (renamed from hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt)56
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt3
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt58
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt75
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt98
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt (renamed from hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt)37
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt10
28 files changed, 385 insertions, 208 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index af8e0e0e..0f0cca2d 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -9,14 +9,15 @@ services:
ports:
- "9092:9092"
environment:
- HOSTNAME_COMMAND: "route -n | awk '/UG[ \t]/{print $$2}'"
+ KAFKA_ADVERTISED_HOST_NAME: "kafka"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
+ KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
+ KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
- zookeeper
- hv-collector:
+ veshvcollector:
build:
context: hv-collector-main
dockerfile: Dockerfile
@@ -26,11 +27,11 @@ services:
- kafka
volumes:
- ./ssl/:/etc/ves-hv/
- xnf-simulator:
+ xnfsimulator:
build:
context: hv-collector-client-simulator
dockerfile: Dockerfile
depends_on:
- - hv-collector
+ - veshvcollector
volumes:
- ./ssl/:/etc/ves-hv/ \ No newline at end of file
diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile
index 19c4c878..58cfa448 100644
--- a/hv-collector-client-simulator/Dockerfile
+++ b/hv-collector-client-simulator/Dockerfile
@@ -7,7 +7,7 @@ LABEL maintainer="Nokia Wroclaw ONAP Team"
WORKDIR /opt/ves-hv-client-simulator
ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
-CMD ["--ves-host", "hv-collector", "--ves-port", "6061"]
+CMD ["--ves-host", "veshvcollector", "--ves-port", "6061"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
COPY target/hv-collector-client-simulator-*.jar ./
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
index d5f7c7c8..87a238a8 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
@@ -20,8 +20,6 @@
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
import org.onap.dcae.collectors.veshv.domain.WireFrame
import org.onap.ves.VesEventV5
import reactor.core.publisher.Flux
@@ -65,12 +63,12 @@ class MessageFactory {
return WireFrame(payload)
}
- private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
+ private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteArray {
val msg = VesEventV5.VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
.build()
- return Unpooled.wrappedBuffer(msg.toByteArray())
+ return msg.toByteArray()
}
}
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index 29573e86..cb56db91 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -25,6 +25,7 @@ import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslProvider
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -64,8 +65,10 @@ class VesHvClient(configuration: ClientConfiguration) {
.asString(Charsets.UTF_8)
.subscribe { str -> logger.info("Server response: $str") }
+ val encoder = WireFrameEncoder(nettyOutbound.alloc())
+
val frames = messages
- .map { it.encode(nettyOutbound.alloc()) }
+ .map(encoder::encode)
.concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
return nettyOutbound
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 913d8f50..73f4d09d 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -23,12 +23,13 @@ import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicReference
@@ -36,7 +37,8 @@ import java.util.concurrent.atomic.AtomicReference
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) {
+class CollectorFactory(val configuration: ConfigurationProvider,
+ private val sinkProvider: SinkProvider) {
fun createVesHvCollectorProvider(): CollectorProvider {
val collector: AtomicReference<Collector> = AtomicReference()
createVesHvCollector().subscribe(collector::set)
@@ -48,7 +50,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- { WireDecoder(it) },
+ { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
VesDecoder(),
MessageValidator(),
Router(config.routing),
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index 60e7d70a..591a48b7 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -19,9 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.ves.VesEventV5.VesEvent
/**
@@ -30,8 +29,8 @@ import org.onap.ves.VesEventV5.VesEvent
*/
internal class VesDecoder {
- fun decode(bb: ByteBuf): VesMessage {
- val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
- return VesMessage(decodedHeader, bb)
+ fun decode(bytes: ByteData): VesMessage {
+ val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+ return VesMessage(decodedHeader, bytes)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index ac11b3e8..965943f6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -24,57 +24,42 @@ import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
-import java.util.concurrent.atomic.AtomicInteger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class VesHvCollector(
- private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder,
+ private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
private val protobufDecoder: VesDecoder,
private val validator: MessageValidator,
private val router: Router,
private val sink: Sink) : Collector {
override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireDecoderSupplier(alloc).let { wireDecoder ->
+ wireChunkDecoderSupplier(alloc).let { wireDecoder ->
dataStream
.concatMap(wireDecoder::decode)
.filter(WireFrame::isValid)
.map(WireFrame::payload)
.map(protobufDecoder::decode)
- .filter(this::validate)
+ .filter(validator::isValid)
.flatMap(this::findRoute)
.compose(sink::send)
- .doOnNext(this::releaseMemory)
.doOnTerminate { releaseBuffersMemory(wireDecoder) }
.then()
}
- private fun validate(msg: VesMessage): Boolean {
- val valid = validator.isValid(msg)
- if (!valid) {
- msg.rawMessage.release()
- }
- return valid
- }
-
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
- private fun releaseMemory(msg: VesMessage) {
- logger.trace { "Releasing memory from ${msg.rawMessage}" }
- msg.rawMessage.release()
- }
-
- private fun releaseBuffersMemory(wireDecoder: WireDecoder) {
- wireDecoder.release()
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) {
+ wireChunkDecoder.release()
}
private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 8e6db2af..358be108 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
import reactor.core.publisher.Flux
import reactor.ipc.netty.http.client.HttpClient
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
index 62b6d1aa..b943e4e5 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -47,7 +47,7 @@ internal class LoggingSinkProvider : SinkProvider {
private fun logMessage(msg: RoutedMessage) {
val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong())
+ val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
if (msgs % INFO_LOGGING_FREQ == 0L)
logger.info(logMessageSupplier)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
index db7845c7..6142fa3c 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.RoutedMessage
@@ -28,13 +28,12 @@ import reactor.core.publisher.Flux
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderRecord
import reactor.kafka.sender.SenderResult
-import java.nio.ByteBuffer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink {
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
val records = messages.map(this::vesToKafkaRecord)
@@ -44,13 +43,13 @@ internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, Byte
.map { it.correlationMetadata() }
}
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> {
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, VesMessage> {
return SenderRecord.create(
msg.topic,
msg.partition,
System.currentTimeMillis(),
msg.message.header,
- msg.message.rawMessage.nioBuffer(),
+ msg.message,
msg.message)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
index 82452e1e..a00a02d2 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -17,18 +17,16 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization.ByteBufferSerializer
-import org.apache.kafka.common.serialization.StringSerializer
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.ves.VesEventV5
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.kafka.sender.KafkaSender
import reactor.kafka.sender.SenderOptions
-import java.nio.ByteBuffer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -40,9 +38,8 @@ internal class KafkaSinkProvider : SinkProvider {
}
private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>()
+ SenderOptions.create<CommonEventHeader, VesMessage>()
.producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)
-
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
new file mode 100644
index 00000000..9753d9e5
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import org.apache.kafka.common.serialization.Serializer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ProtobufSerializer :Serializer<MessageLite> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // no configuration
+ }
+
+ override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
+ data?.toByteArray()
+
+ override fun close() {
+ // cleanup not needed
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
new file mode 100644
index 00000000..7a6ac7c8
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.dcae.collectors.veshv.model.VesMessage
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class VesMessageSerializer : Serializer<VesMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+
+ override fun close() {
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
index e4dd7cf6..34a8b928 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import io.netty.buffer.ByteBuf
import io.netty.buffer.CompositeByteBuf
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.FluxSink
@@ -33,6 +34,7 @@ import java.util.function.Consumer
* @since May 2018
*/
internal class StreamBufferEmitter(
+ private val decoder: WireFrameDecoder,
private val streamBuffer: CompositeByteBuf,
private val newFrame: ByteBuf)
: Consumer<FluxSink<WireFrame>> {
@@ -58,15 +60,15 @@ internal class StreamBufferEmitter(
streamBuffer.discardReadComponents()
}
sink.onRequest { requestedFrameCount ->
- WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber()
+ WireFrameSink(decoder, streamBuffer, sink, requestedFrameCount).handleSubscriber()
}
}
}
}
companion object {
- fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
- Flux.create(StreamBufferEmitter(streamBuffer, newFrame))
+ fun createFlux(decoder: WireFrameDecoder, streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
+ Flux.create(StreamBufferEmitter(decoder, streamBuffer, newFrame))
private const val INCREASE_WRITER_INDEX = true
private val logger = Logger(StreamBufferEmitter::class)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
index b701aaf2..580d36c5 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -22,6 +22,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
@@ -30,10 +31,10 @@ import reactor.core.publisher.Flux
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+internal class WireChunkDecoder(private val decoder: WireFrameDecoder, alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
private val streamBuffer = alloc.compositeBuffer()
- fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf)
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(decoder, streamBuffer, byteBuf)
.doOnSubscribe { logIncomingMessage(byteBuf) }
.doOnNext(this::logDecodedWireMessage)
@@ -41,7 +42,6 @@ internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
streamBuffer.release()
}
-
private fun logIncomingMessage(wire: ByteBuf) {
logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
index bc9c8389..a576dc65 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.wire
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.FluxSink
@@ -30,6 +31,7 @@ import reactor.core.publisher.FluxSink
* @since May 2018
*/
internal class WireFrameSink(
+ private val decoder: WireFrameDecoder,
private val streamBuffer: ByteBuf,
private val sink: FluxSink<WireFrame>,
private val requestedFrameCount: Long) {
@@ -80,7 +82,7 @@ internal class WireFrameSink(
private fun decodeFirstFrameFromBuffer(): WireFrame? =
try {
- WireFrame.decodeFirst(streamBuffer)
+ decoder.decodeFirst(streamBuffer)
} catch (ex: MissingWireFrameBytesException) {
logger.debug { "${ex.message} - waiting for more data" }
null
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
index 38256896..03c53e10 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -19,11 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.model
-import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf)
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
index 10e79156..bc030587 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -73,7 +73,7 @@ class RouteBuilder {
this.targetTopic = targetTopic
}
- fun withFixedPartitioning(num: Int = 1) {
+ fun withFixedPartitioning(num: Int = 0) {
partitioning = { _ -> num }
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
index df2840b9..017187a4 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -20,27 +20,29 @@
package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.buffer.Unpooled.wrappedBuffer
+import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import org.onap.ves.VesEventV5.VesEvent
-import org.assertj.core.api.Assertions.assertThat
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.*
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Priority
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.getDefaultInstance
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.newBuilder
internal object MessageValidatorTest : Spek({
- fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf {
+ fun vesMessageBytes(commonHeader: CommonEventHeader): ByteData {
val msg = VesEvent.newBuilder()
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
.build()
- return wrappedBuffer(msg.toByteArray())
+ return msg.toByteData()
}
given("Message validator") {
@@ -79,7 +81,7 @@ internal object MessageValidatorTest : Spek({
}
on("ves hv message bytes") {
- val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER)
+ val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
it("should not accept message with default header") {
assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
}
@@ -97,7 +99,7 @@ internal object MessageValidatorTest : Spek({
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
.build()
- val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+ val rawMessageBytes = msg.toByteData()
it("should not accept not fully initialized message header ") {
val vesMessage = VesMessage(commonHeader, rawMessageBytes)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 3812db58..c852f5f4 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -1,11 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
package org.onap.dcae.collectors.veshv.impl
-import io.netty.buffer.Unpooled
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -34,7 +53,7 @@ object RouterTest : Spek({
val cut = Router(config)
on("message with existing route (rtpm)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -55,7 +74,7 @@ object RouterTest : Spek({
}
on("message with existing route (trace)") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should have route available") {
@@ -63,7 +82,7 @@ object RouterTest : Spek({
}
it("should be routed to proper partition") {
- assertThat(result?.partition).isEqualTo(1)
+ assertThat(result?.partition).isEqualTo(0)
}
it("should be routed to proper topic") {
@@ -76,7 +95,7 @@ object RouterTest : Spek({
}
on("message with unknown route") {
- val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER)
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), ByteData.EMPTY)
val result = cut.findDestination(message)
it("should not have route available") {
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 263ad441..90b34b1c 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -21,13 +21,14 @@ package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
-import io.netty.buffer.Unpooled.wrappedBuffer
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.toByteData
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.ves.VesEventV5.VesEvent
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
@@ -45,7 +46,7 @@ internal object VesDecoderTest : Spek({
.setCommonEventHeader(commonHeader)
.setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
.build()
- val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+ val rawMessageBytes = msg.toByteData()
it("should decode only header and pass it on along with raw message") {
@@ -60,7 +61,7 @@ internal object VesDecoderTest : Spek({
}
on("invalid ves hv message bytes") {
- val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
+ val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
it("should throw error") {
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index 0a10aa1f..1ddcc3dc 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -28,6 +28,8 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
import reactor.test.test
@@ -35,13 +37,17 @@ import reactor.test.test
* @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
* @since May 2018
*/
-internal object WireDecoderTest : Spek({
+internal object WireChunkDecoderTest : Spek({
val alloc = UnpooledByteBufAllocator.DEFAULT
val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
- fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+ val encoder = WireFrameEncoder(alloc)
+
+ fun WireChunkDecoder.decode(frame: WireFrame) = decode(encoder.encode(frame))
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
assertThat(bb.refCnt())
@@ -63,7 +69,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.EMPTY_BUFFER
it("should yield empty result") {
- WireDecoder().decode(input).test().verifyComplete()
+ createInstance().decode(input).test().verifyComplete()
}
}
@@ -71,7 +77,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
it("should yield empty result") {
- WireDecoder().decode(input).test().verifyComplete()
+ createInstance().decode(input).test().verifyComplete()
}
it("should release memory") {
@@ -83,7 +89,7 @@ internal object WireDecoderTest : Spek({
val input = Unpooled.wrappedBuffer(samplePayload)
it("should yield error") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.verifyError(InvalidWireFrameMarkerException::class.java)
}
@@ -93,10 +99,10 @@ internal object WireDecoderTest : Spek({
}
given("valid input") {
- val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+ val input = WireFrame(samplePayload)
it("should yield decoded input frame") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -104,11 +110,11 @@ internal object WireDecoderTest : Spek({
given("valid input with part of next frame") {
val input = Unpooled.buffer()
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)).slice(0, 3))
it("should yield decoded input frame") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
}
@@ -120,11 +126,11 @@ internal object WireDecoderTest : Spek({
given("valid input with garbage after it") {
val input = Unpooled.buffer()
- .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(encoder.encode(WireFrame(samplePayload)))
.writeBytes(Unpooled.wrappedBuffer(samplePayload))
it("should yield decoded input frame and error") {
- WireDecoder().decode(input).test()
+ createInstance().decode(input).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyError(InvalidWireFrameMarkerException::class.java)
}
@@ -135,11 +141,11 @@ internal object WireDecoderTest : Spek({
}
given("two inputs containing two separate messages") {
- val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val input1 = encoder.encode(WireFrame(samplePayload))
+ val input2 = encoder.encode(WireFrame(anotherPayload))
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -154,16 +160,12 @@ internal object WireDecoderTest : Spek({
}
given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input1 = encoder.encode(WireFrame(samplePayload))
val input2 = Unpooled.wrappedBuffer(anotherPayload)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1)
- .doOnNext {
- // releasing retained payload
- it.payload.release()
- }
.test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -182,8 +184,8 @@ internal object WireDecoderTest : Spek({
given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val frame1 = encoder.encode(WireFrame(samplePayload))
+ val frame2 = encoder.encode(WireFrame(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1)
@@ -191,7 +193,7 @@ internal object WireDecoderTest : Spek({
val input2 = Unpooled.buffer().writeBytes(frame2)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.expectNextMatches { it.payloadSize == samplePayload.size }
.verifyComplete()
@@ -206,8 +208,8 @@ internal object WireDecoderTest : Spek({
}
given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
- val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+ val frame1 = encoder.encode(WireFrame(samplePayload))
+ val frame2 = encoder.encode(WireFrame(anotherPayload))
val input1 = Unpooled.buffer()
.writeBytes(frame1, 5)
@@ -216,7 +218,7 @@ internal object WireDecoderTest : Spek({
.writeBytes(frame2)
it("should yield decoded input frames") {
- val cut = WireDecoder()
+ val cut = createInstance()
cut.decode(input1).test()
.verifyComplete()
cut.decode(input2).test()
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index fc4fb656..08b6382d 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -24,7 +24,6 @@ import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -125,7 +124,7 @@ object VesHvSpecification : Spek({
val msg = messages[0]
assertThat(msg.topic).describedAs("routed message topic").isEqualTo(HVRANMEAS_TOPIC)
- assertThat(msg.partition).describedAs("routed message partition").isEqualTo(1)
+ assertThat(msg.partition).describedAs("routed message partition").isEqualTo(0)
}
system("should drop message if route was not found") { sut ->
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
new file mode 100644
index 00000000..2b84e3f1
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ByteData.kt
@@ -0,0 +1,58 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import com.google.protobuf.MessageLite
+import io.netty.buffer.ByteBuf
+import java.nio.charset.Charset
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ByteData(private val data: ByteArray) {
+
+ fun size() = data.size
+
+ /**
+ * This will expose mutable state of the data.
+ *
+ * @return wrapped data buffer (NOT a copy)
+ */
+ fun unsafeAsArray() = data
+
+ fun writeTo(byteBuf: ByteBuf) {
+ byteBuf.writeBytes(data)
+ }
+
+ fun asString(charset: Charset = Charset.defaultCharset()) = String(data, charset)
+
+ companion object {
+ val EMPTY = ByteData(byteArrayOf())
+
+ fun readFrom(byteBuf: ByteBuf, length: Int): ByteData {
+ val dataArray = ByteArray(length)
+ byteBuf.readBytes(dataArray)
+ return ByteData(dataArray)
+ }
+ }
+}
+
+fun MessageLite.toByteData(): ByteData = ByteData(toByteArray())
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
index 8c8b4718..caf13c53 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -19,12 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
-import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
-import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
-
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
*
@@ -55,82 +49,25 @@ import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesExc
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-data class WireFrame(val payload: ByteBuf,
+data class WireFrame(val payload: ByteData,
val majorVersion: Short,
val minorVersion: Short,
val payloadSize: Int) {
- constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+ constructor(payload: ByteArray) : this(ByteData(payload), 1, 0, payload.size)
fun isValid(): Boolean =
majorVersion == SUPPORTED_MAJOR_VERSION
- && payload.readableBytes() == payloadSize
-
- fun encode(allocator: ByteBufAllocator): ByteBuf {
- val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
-
- bb.writeByte(MARKER_BYTE.toInt())
- bb.writeByte(majorVersion.toInt())
- bb.writeByte(minorVersion.toInt())
- bb.writeInt(payloadSize)
- bb.writeBytes(payload)
-
- return bb
- }
+ && payload.size() == payloadSize
companion object {
- fun decodeFirst(byteBuf: ByteBuf): WireFrame {
- verifyNotEmpty(byteBuf)
- byteBuf.markReaderIndex()
-
- verifyMarker(byteBuf)
- verifyMinimumSize(byteBuf)
-
- val majorVersion = byteBuf.readUnsignedByte()
- val minorVersion = byteBuf.readUnsignedByte()
- val payloadSize = verifyPayloadSize(byteBuf)
-
- val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
- byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
-
- return WireFrame(payload, majorVersion, minorVersion, payloadSize)
- }
-
- private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
- byteBuf.readInt().let { payloadSize ->
- if (byteBuf.readableBytes() < payloadSize) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < payload size")
- } else {
- payloadSize
- }
- }
-
- private fun verifyMinimumSize(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < HEADER_SIZE) {
- byteBuf.resetReaderIndex()
- throw MissingWireFrameBytesException("readable bytes < header size")
- }
- }
-
- private fun verifyMarker(byteBuf: ByteBuf) {
- val mark = byteBuf.readUnsignedByte()
- if (mark != MARKER_BYTE) {
- byteBuf.resetReaderIndex()
- throw InvalidWireFrameMarkerException(mark)
- }
- }
-
- private fun verifyNotEmpty(byteBuf: ByteBuf) {
- if (byteBuf.readableBytes() < 1) {
- throw EmptyWireFrameException()
- }
- }
+ const val SUPPORTED_MAJOR_VERSION: Short = 1
const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
1 * java.lang.Integer.BYTES
const val MARKER_BYTE: Short = 0xFF
- const val SUPPORTED_MAJOR_VERSION: Short = 1
+
}
+
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
new file mode 100644
index 00000000..d6804c7d
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -0,0 +1,98 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameEncoder(val allocator: ByteBufAllocator) {
+
+ fun encode(frame: WireFrame): ByteBuf {
+ val bb = allocator.buffer(WireFrame.HEADER_SIZE + frame.payload.size())
+
+ bb.writeByte(WireFrame.MARKER_BYTE.toInt())
+ bb.writeByte(frame.majorVersion.toInt())
+ bb.writeByte(frame.minorVersion.toInt())
+ bb.writeInt(frame.payloadSize)
+ frame.payload.writeTo(bb)
+
+ return bb
+ }
+}
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameDecoder {
+
+ fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+ verifyNotEmpty(byteBuf)
+ byteBuf.markReaderIndex()
+
+ verifyMarker(byteBuf)
+ verifyMinimumSize(byteBuf)
+
+ val majorVersion = byteBuf.readUnsignedByte()
+ val minorVersion = byteBuf.readUnsignedByte()
+ val payloadSize = verifyPayloadSize(byteBuf)
+ val payload = ByteData.readFrom(byteBuf, payloadSize)
+
+ return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+ }
+
+ private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+ byteBuf.readInt().let { payloadSize ->
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < payload size")
+ } else {
+ payloadSize
+ }
+ }
+
+ private fun verifyMinimumSize(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < WireFrame.HEADER_SIZE) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < header size")
+ }
+ }
+
+ private fun verifyMarker(byteBuf: ByteBuf) {
+ val mark = byteBuf.readUnsignedByte()
+ if (mark != WireFrame.MARKER_BYTE) {
+ byteBuf.resetReaderIndex()
+ throw InvalidWireFrameMarkerException(mark)
+ }
+ }
+
+ private fun verifyNotEmpty(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < 1) {
+ throw EmptyWireFrameException()
+ }
+ }
+}
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 00113267..ed64f3b3 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -35,24 +35,24 @@ import java.nio.charset.Charset
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-object WireFrameTest : Spek({
+object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
+ val encoder = WireFrameEncoder(UnpooledByteBufAllocator.DEFAULT)
+ val decoder = WireFrameDecoder()
fun createSampleFrame() =
- WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+ WireFrame(payloadAsString.toByteArray(Charset.defaultCharset()))
fun encodeSampleFrame() =
createSampleFrame().let {
- Unpooled.buffer()
- .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
-
+ encoder.encode(it)
}
describe("Wire Frame invariants") {
given("input with unsupported major version") {
val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
+ payload = ByteData.EMPTY,
majorVersion = 100,
minorVersion = 2,
payloadSize = 0)
@@ -64,7 +64,7 @@ object WireFrameTest : Spek({
given("input with too small payload size") {
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ payload = ByteData(byteArrayOf(1, 2, 3)),
majorVersion = 1,
minorVersion = 0,
payloadSize = 1)
@@ -76,7 +76,7 @@ object WireFrameTest : Spek({
given("input with too big payload size") {
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ payload = ByteData(byteArrayOf(1, 2, 3)),
majorVersion = 1,
minorVersion = 0,
payloadSize = 8)
@@ -89,7 +89,7 @@ object WireFrameTest : Spek({
given("valid input") {
val payload = byteArrayOf(6, 9, 8, 6)
val input = WireFrame(
- payload = Unpooled.wrappedBuffer(payload),
+ payload = ByteData(payload),
majorVersion = 1,
minorVersion = 0,
payloadSize = payload.size)
@@ -107,7 +107,7 @@ object WireFrameTest : Spek({
describe("encode-decode methods' compatibility") {
val frame = createSampleFrame()
val encoded = encodeSampleFrame()
- val decoded = WireFrame.decodeFirst(encoded)
+ val decoded = decoder.decodeFirst(encoded)
it("should decode major version") {
assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -122,14 +122,9 @@ object WireFrameTest : Spek({
}
it("should decode payload") {
- assertThat(decoded.payload.toString(Charset.defaultCharset()))
+ assertThat(decoded.payload.asString())
.isEqualTo(payloadAsString)
}
-
- it("should retain decoded payload") {
- encoded.release()
- assertThat(decoded.payload.refCnt()).isEqualTo(1)
- }
}
describe("TCP framing") {
@@ -139,7 +134,7 @@ object WireFrameTest : Spek({
val buff = Unpooled.buffer()
.writeBytes(encodeSampleFrame())
.writeByte(0xAA)
- val decoded = WireFrame.decodeFirst(buff)
+ val decoded = decoder.decodeFirst(buff)
assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
assertThat(buff.readableBytes()).isEqualTo(1)
@@ -150,7 +145,7 @@ object WireFrameTest : Spek({
.writeByte(0xFF)
assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when first byte is not 0xFF but length looks ok") {
@@ -159,7 +154,7 @@ object WireFrameTest : Spek({
.writeBytes("some garbage".toByteArray())
assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when first byte is not 0xFF and length is to short") {
@@ -167,7 +162,7 @@ object WireFrameTest : Spek({
.writeByte(0xAA)
assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
it("should throw exception when payload doesn't fit") {
@@ -176,7 +171,7 @@ object WireFrameTest : Spek({
buff.writerIndex(buff.writerIndex() - 2)
assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
- .isThrownBy { WireFrame.decodeFirst(buff) }
+ .isThrownBy { decoder.decodeFirst(buff) }
}
}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 4438cf38..b2f4633a 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -20,13 +20,13 @@
package org.onap.dcae.collectors.veshv.main
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.model.routing
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.ArgBasedServerConfiguration.WrongArgumentException
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.model.routing
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
import org.slf4j.LoggerFactory
import kotlin.system.exitProcess
@@ -39,7 +39,7 @@ fun main(args: Array<String>) {
val collectorProvider = CollectorFactory(
resolveConfigurationProvider(serverConfiguration),
- AdapterFactory.loggingSink()
+ AdapterFactory.kafkaSink()
).createVesHvCollectorProvider()
ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
} catch (ex: WrongArgumentException) {
@@ -55,7 +55,7 @@ private fun resolveConfigurationProvider(serverConfiguration: ServerConfiguratio
if (serverConfiguration.configurationUrl.isEmpty()) {
logger.info("Configuration url not specified - using default config")
val sampleConfig = CollectorConfiguration(
- kafkaBootstrapServers = "dmaap.cluster.local:9969",
+ kafkaBootstrapServers = "kafka:9092",
routing = routing {
defineRoute {
fromDomain(Domain.HVRANMEAS)