aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-06-07 11:52:16 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-08-01 13:06:43 +0200
commit07bbbf71cd65b29f446a1b475add87f20365db83 (patch)
treee64fcf12c21e46358043744476d68765634d7f6f
parent767d0464a19e0949d2919e6df15c9653dec50503 (diff)
Fix TCP stream framing issue
Because of the nature of TCP protocol we receive consecutive IO buffer snapshots - not separate messages. That means that we need to join incomming buffers and then split it into separate WireFrames. Closes ONAP-312 Change-Id: I84ba0ec58a41ff9026f2fca24d2b15f3adcf0a19 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Issue-ID: DCAEGEN2-601
-rw-r--r--docker-compose.yml4
-rw-r--r--hv-collector-client-simulator/Dockerfile4
-rw-r--r--hv-collector-client-simulator/pom.xml6
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt9
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt2
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt18
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt5
-rw-r--r--hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt4
-rw-r--r--hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt4
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt3
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt17
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt57
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt48
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt64
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt6
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt74
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt56
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt92
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt7
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt104
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt233
-rw-r--r--hv-collector-core/src/test/resources/logback-test.xml (renamed from hv-collector-core/src/test/resources/logback.xml)2
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt19
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt72
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt8
-rw-r--r--hv-collector-ct/src/test/resources/logback-test.xml2
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt67
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt26
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt (renamed from hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt)23
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt26
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt26
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt164
-rw-r--r--hv-collector-main/Dockerfile2
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt2
-rw-r--r--hv-collector-main/src/main/resources/logback.xml3
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt11
39 files changed, 1031 insertions, 299 deletions
diff --git a/docker-compose.yml b/docker-compose.yml
index 68bb3d0b..af8e0e0e 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -25,7 +25,7 @@ services:
depends_on:
- kafka
volumes:
- - /etc/ves-hv/:/etc/ves-hv/
+ - ./ssl/:/etc/ves-hv/
xnf-simulator:
build:
context: hv-collector-client-simulator
@@ -33,4 +33,4 @@ services:
depends_on:
- hv-collector
volumes:
- - /etc/ves-hv/:/etc/ves-hv/ \ No newline at end of file
+ - ./ssl/:/etc/ves-hv/ \ No newline at end of file
diff --git a/hv-collector-client-simulator/Dockerfile b/hv-collector-client-simulator/Dockerfile
index 159f900d..19c4c878 100644
--- a/hv-collector-client-simulator/Dockerfile
+++ b/hv-collector-client-simulator/Dockerfile
@@ -6,8 +6,8 @@ LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
LABEL maintainer="Nokia Wroclaw ONAP Team"
WORKDIR /opt/ves-hv-client-simulator
-ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
+ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.simulators.xnf.MainKt"]
CMD ["--ves-host", "hv-collector", "--ves-port", "6061"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
-COPY target/hv-collector-client-simulator-*.jar ./ \ No newline at end of file
+COPY target/hv-collector-client-simulator-*.jar ./
diff --git a/hv-collector-client-simulator/pom.xml b/hv-collector-client-simulator/pom.xml
index e7a25855..012bda53 100644
--- a/hv-collector-client-simulator/pom.xml
+++ b/hv-collector-client-simulator/pom.xml
@@ -132,6 +132,12 @@
<artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-tcnative-boringssl-static</artifactId>
+ <scope>runtime</scope>
+ <classifier>${os.detected.classifier}</classifier>
+ </dependency>
+ <dependency>
<groupId>com.nhaarman</groupId>
<artifactId>mockito-kotlin</artifactId>
</dependency>
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
index 49653b57..b946689f 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ArgBasedClientConfiguration.kt
@@ -28,7 +28,7 @@ import java.io.File
import java.nio.file.Paths
internal object DefaultValues {
- const val MESSAGES_AMOUNT = 1
+ const val MESSAGES_AMOUNT = -1L
const val PRIVATE_KEY_FILE = "/etc/ves-hv/client.key"
const val CERT_FILE = "/etc/ves-hv/client.crt"
const val TRUST_CERT_FILE = "/etc/ves-hv/trust.crt"
@@ -98,7 +98,7 @@ internal object ArgBasedClientConfiguration {
val cmdLine = parser.parse(options, args)
val host = cmdLine.stringValue(OPT_VES_HOST)
val port = cmdLine.intValue(OPT_VES_PORT)
- val msgsAmount = cmdLine.intValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
+ val msgsAmount = cmdLine.longValueOrDefault(OPT_MESSAGES_AMOUNT, DefaultValues.MESSAGES_AMOUNT)
return ClientConfiguration(
host,
port,
@@ -121,8 +121,9 @@ internal object ArgBasedClientConfiguration {
private fun stringPathToPath(path: String) = Paths.get(File(path).toURI())
- private fun CommandLine.intValueOrDefault(option: Option, default: Int) =
- getOptionValue(option.opt)?.toInt() ?: default
+
+ private fun CommandLine.longValueOrDefault(option: Option, default: Long) =
+ getOptionValue(option.opt)?.toLong() ?: default
private fun CommandLine.intValue(option: Option) =
getOptionValue(option.opt).toInt()
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
index e835ee95..83d6f7c0 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/config/ClientConfiguration.kt
@@ -27,4 +27,4 @@ data class ClientConfiguration(
val vesHost: String,
val vesPort: Int,
val security: ClientSecurityConfiguration,
- val messagesAmount: Int)
+ val messagesAmount: Long)
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
index 0c578b38..d5f7c7c8 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactory.kt
@@ -38,8 +38,13 @@ class MessageFactory {
const val DEFAULT_LAST_EPOCH: Long = 120034455
}
- fun createMessageFlux(amount: Int = 1): Flux<WireFrame> =
- Mono.just(createMessage()).repeat(amount.toLong())
+ fun createMessageFlux(amount: Long = -1): Flux<WireFrame> =
+ Mono.fromCallable(this::createMessage).let {
+ if (amount < 0)
+ it.repeat()
+ else
+ it.repeat(amount)
+ }
private fun createMessage(): WireFrame {
@@ -57,14 +62,7 @@ class MessageFactory {
.build()
val payload = vesMessageBytes(commonHeader)
- return WireFrame(
- payload = payload,
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 2,
- payloadSize = payload.readableBytes())
-
-
+ return WireFrame(payload)
}
private fun vesMessageBytes(commonHeader: VesEventV5.VesEvent.CommonEventHeader): ByteBuf {
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
index c911c533..29573e86 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/VesHvClient.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.simulators.xnf.impl
+import io.netty.buffer.Unpooled
import io.netty.handler.ssl.ClientAuth
import io.netty.handler.ssl.SslContext
import io.netty.handler.ssl.SslContextBuilder
@@ -29,6 +30,7 @@ import org.onap.dcae.collectors.veshv.simulators.xnf.config.ClientSecurityConfig
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.ipc.netty.NettyInbound
import reactor.ipc.netty.NettyOutbound
import reactor.ipc.netty.tcp.TcpClient
@@ -53,7 +55,6 @@ class VesHvClient(configuration: ClientConfiguration) {
client.startAndAwait(BiFunction { i, o -> handler(i, o, messages) })
}
- // sending flux with multiple WireFrames not supported yet
private fun handler(nettyInbound: NettyInbound,
nettyOutbound: NettyOutbound,
messages: Flux<WireFrame>): Publisher<Void> {
@@ -64,8 +65,8 @@ class VesHvClient(configuration: ClientConfiguration) {
.subscribe { str -> logger.info("Server response: $str") }
val frames = messages
- .doOnNext { logger.info { "About to send message with ${it.payloadSize} B of payload" } }
.map { it.encode(nettyOutbound.alloc()) }
+ .concatWith(Mono.just(Unpooled.EMPTY_BUFFER))
return nettyOutbound
.options { it.flushOnEach() }
diff --git a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index ee7f49a6..68f999ef 100644
--- a/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-client-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -36,12 +36,14 @@ fun main(args: Array<String>) {
val clientConfig = ArgBasedClientConfiguration.parse(args)
val messageFactory = MessageFactory()
val client = VesHvClient(clientConfig)
- client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
+ client.send(messageFactory.createMessageFlux(clientConfig.messagesAmount))
} catch (e: ArgBasedClientConfiguration.WrongArgumentException) {
e.printHelp("java org.onap.dcae.collectors.veshv.main.MainKt")
+ System.exit(1)
} catch (e: Exception) {
logger.error(e.localizedMessage)
logger.debug("An error occurred when starting ves client", e)
+ System.exit(2)
}
}
diff --git a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
index edcec65f..405a15eb 100644
--- a/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
+++ b/hv-collector-client-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/MessageFactoryTest.kt
@@ -23,8 +23,7 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.MessageFactory
-import kotlin.test.assertEquals
+import reactor.test.test
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -35,16 +34,18 @@ object MessageFactoryTest : Spek({
val factory = MessageFactory()
given("no parameters") {
- it("should return flux with one message") {
- val result = factory.createMessageFlux()
-
- assertEquals(1, result.count().block())
+ it("should return infinite flux") {
+ val limit = 1000L
+ factory.createMessageFlux().take(limit).test()
+ .expectNextCount(limit)
+ .verifyComplete()
}
}
given("messages amount") {
it("should return message flux of specified size") {
- val result = factory.createMessageFlux(5)
- assertEquals(5, result.count().block())
+ factory.createMessageFlux(5).test()
+ .expectNextCount(5)
+ .verifyComplete()
}
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index dfbbdb56..ed686fe8 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -20,12 +20,13 @@
package org.onap.dcae.collectors.veshv.boundary
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
interface Collector {
- fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
+ fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
}
typealias CollectorProvider = () -> Collector
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 850d3a84..913d8f50 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -28,7 +28,7 @@ import org.onap.dcae.collectors.veshv.impl.MessageValidator
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.WireDecoder
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
import reactor.core.publisher.Flux
import java.util.concurrent.atomic.AtomicReference
@@ -48,7 +48,7 @@ class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvide
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- WireDecoder(),
+ { WireDecoder(it) },
VesDecoder(),
MessageValidator(),
Router(config.routing),
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
index b0a9da81..12e1c1e6 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -39,7 +39,8 @@ internal class MessageValidator {
fun isValid(message: VesMessage): Boolean {
val header = message.header
- return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ val ret = allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ return ret
}
private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
index cdc70f82..60e7d70a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.impl
-import com.google.protobuf.InvalidProtocolBufferException
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -31,18 +30,8 @@ import org.onap.ves.VesEventV5.VesEvent
*/
internal class VesDecoder {
- fun decode(bb: ByteBuf): VesMessage? =
- try {
- val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
- VesMessage(decodedHeader, bb)
- } catch (ex: InvalidProtocolBufferException) {
- logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" }
- logger.debug("Cause", ex)
- null
- }
-
-
- companion object {
- private val logger = Logger(VesDecoder::class)
+ fun decode(bb: ByteBuf): VesMessage {
+ val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
+ return VesMessage(decodedHeader, bb)
}
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 535fbe12..ac11b3e8 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -20,47 +20,43 @@
package org.onap.dcae.collectors.veshv.impl
import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.wire.WireDecoder
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.model.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
+import java.util.concurrent.atomic.AtomicInteger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
internal class VesHvCollector(
- private val wireDecoder: WireDecoder,
+ private val wireDecoderSupplier: (ByteBufAllocator) -> WireDecoder,
private val protobufDecoder: VesDecoder,
private val validator: MessageValidator,
private val router: Router,
private val sink: Sink) : Collector {
- override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
- dataStream
- .doOnNext(this::logIncomingMessage)
- .flatMap(this::decodeWire)
- .doOnNext(this::logDecodedWireMessage)
- .flatMap(this::decodeProtobuf)
- .filter(this::validate)
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext(this::releaseMemory)
- .then()
- private fun logIncomingMessage(wire: ByteBuf) {
- logger.debug { "Got message with total ${wire.readableBytes()} B"}
- }
-
- private fun logDecodedWireMessage(payload: ByteBuf) {
- logger.debug { "Wire payload size: ${payload.readableBytes()} B"}
- }
-
- private fun decodeWire(wire: ByteBuf) = omitWhenNull(wire, wireDecoder::decode)
-
- private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
+ override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
+ wireDecoderSupplier(alloc).let { wireDecoder ->
+ dataStream
+ .concatMap(wireDecoder::decode)
+ .filter(WireFrame::isValid)
+ .map(WireFrame::payload)
+ .map(protobufDecoder::decode)
+ .filter(this::validate)
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext(this::releaseMemory)
+ .doOnTerminate { releaseBuffersMemory(wireDecoder) }
+ .then()
+ }
private fun validate(msg: VesMessage): Boolean {
val valid = validator.isValid(msg)
@@ -73,21 +69,16 @@ internal class VesHvCollector(
private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNull(msg, router::findDestination)
private fun releaseMemory(msg: VesMessage) {
+ logger.trace { "Releasing memory from ${msg.rawMessage}" }
msg.rawMessage.release()
}
- private fun <T, V>omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
-
- private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
- val result = mapper(input)
- return if (result == null) {
- input.release()
- Mono.empty()
- } else {
- Mono.just(result)
- }
+ private fun releaseBuffersMemory(wireDecoder: WireDecoder) {
+ wireDecoder.release()
}
+ private fun <T, V> omitWhenNull(input: T, mapper: (T) -> V?): Mono<V> = Mono.justOrEmpty(mapper(input))
+
companion object {
val logger = Logger(VesHvCollector::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
index 0aacb266..8e6db2af 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -19,24 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.impl.adapters
-import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
-import org.apache.kafka.common.serialization.ByteBufferSerializer
-import org.apache.kafka.common.serialization.StringSerializer
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
import reactor.core.publisher.Flux
import reactor.ipc.netty.http.client.HttpClient
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-import java.nio.ByteBuffer
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -51,33 +38,6 @@ object AdapterFactory {
override fun invoke() = Flux.just(config)
}
- private class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- val sender = KafkaSender.create(
- SenderOptions.create<CommonEventHeader, ByteBuffer>()
- .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
- .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java))
- return KafkaSink(sender)
- }
- }
-
-
- private class LoggingSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return object : Sink {
- private val logger = Logger(LoggingSinkProvider::class)
- override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
- messages
- .doOnNext { msg ->
- logger.info { "Message routed to ${msg.topic}" }
- }
- .map { it.message }
-
- }
- }
- }
-
fun consulConfigurationProvider(url: String): ConfigurationProvider =
ConsulConfigurationProvider(url, httpAdapter())
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
new file mode 100644
index 00000000..82452e1e
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSinkProvider.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteBufferSerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ }
+
+ private fun constructSenderOptions(config: CollectorConfiguration) =
+ SenderOptions.create<VesEventV5.VesEvent.CommonEventHeader, ByteBuffer>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java)
+
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
new file mode 100644
index 00000000..62b6d1aa
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -0,0 +1,64 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class LoggingSinkProvider : SinkProvider {
+
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return object : Sink {
+ private val logger = Logger(LoggingSinkProvider::class)
+ private val totalMessages = AtomicLong()
+ private val totalBytes = AtomicLong()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> =
+ messages
+ .doOnNext(this::logMessage)
+ .map { it.message }
+
+ private fun logMessage(msg: RoutedMessage) {
+ val msgs = totalMessages.addAndGet(1)
+ val bytes = totalBytes.addAndGet(msg.message.rawMessage.readableBytes().toLong())
+ val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
+ if (msgs % INFO_LOGGING_FREQ == 0L)
+ logger.info(logMessageSupplier)
+ else
+ logger.trace(logMessageSupplier)
+ }
+
+ }
+ }
+
+ companion object {
+ const val INFO_LOGGING_FREQ = 100_000
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index 208b1ba0..564aa8df 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.impl.socket
+import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
@@ -59,17 +60,18 @@ internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
logger.debug("Got connection")
+ nettyOutbound.alloc()
val sendHello = nettyOutbound
.options { it.flushOnEach() }
.sendString(Mono.just("ONAP_VES_HV/0.1\n"))
.then()
- val handleIncomingMessages = collectorProvider().handleConnection(nettyInbound.receive())
+ val handleIncomingMessages = collectorProvider()
+ .handleConnection(nettyInbound.context().channel().alloc(), nettyInbound.receive().retain())
return sendHello.then(handleIncomingMessages)
}
-
companion object {
private val logger = Logger(NettyTcpServer::class)
}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
new file mode 100644
index 00000000..e4dd7cf6
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/StreamBufferEmitter.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.CompositeByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.core.publisher.FluxSink
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.function.Consumer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class StreamBufferEmitter(
+ private val streamBuffer: CompositeByteBuf,
+ private val newFrame: ByteBuf)
+ : Consumer<FluxSink<WireFrame>> {
+
+ private val subscribed = AtomicBoolean(false)
+
+ override fun accept(sink: FluxSink<WireFrame>) {
+ when {
+
+ subscribed.getAndSet(true) ->
+ sink.error(IllegalStateException("Wire frame emitter supports only one subscriber"))
+
+ newFrame.readableBytes() == 0 -> {
+ logger.trace { "Discarding empty buffer" }
+ newFrame.release()
+ sink.complete()
+ }
+
+ else -> {
+ streamBuffer.addComponent(INCREASE_WRITER_INDEX, newFrame)
+ sink.onDispose {
+ logger.debug("Disposing read components")
+ streamBuffer.discardReadComponents()
+ }
+ sink.onRequest { requestedFrameCount ->
+ WireFrameSink(streamBuffer, sink, requestedFrameCount).handleSubscriber()
+ }
+ }
+ }
+ }
+
+ companion object {
+ fun createFlux(streamBuffer: CompositeByteBuf, newFrame: ByteBuf): Flux<WireFrame> =
+ Flux.create(StreamBufferEmitter(streamBuffer, newFrame))
+
+ private const val INCREASE_WRITER_INDEX = true
+ private val logger = Logger(StreamBufferEmitter::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
new file mode 100644
index 00000000..b701aaf2
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoder.kt
@@ -0,0 +1,56 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireDecoder(alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+ private val streamBuffer = alloc.compositeBuffer()
+
+ fun decode(byteBuf: ByteBuf): Flux<WireFrame> = StreamBufferEmitter.createFlux(streamBuffer, byteBuf)
+ .doOnSubscribe { logIncomingMessage(byteBuf) }
+ .doOnNext(this::logDecodedWireMessage)
+
+ fun release() {
+ streamBuffer.release()
+ }
+
+
+ private fun logIncomingMessage(wire: ByteBuf) {
+ logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ }
+
+ private fun logDecodedWireMessage(wire: WireFrame) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B." }
+ }
+
+ companion object {
+ val logger = Logger(VesHvCollector::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
new file mode 100644
index 00000000..bc9c8389
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameSink.kt
@@ -0,0 +1,92 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.FluxSink
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireFrameSink(
+ private val streamBuffer: ByteBuf,
+ private val sink: FluxSink<WireFrame>,
+ private val requestedFrameCount: Long) {
+
+ fun handleSubscriber() {
+ logger.trace { "Decoder buffer capacity before decoding frame: ${streamBuffer.capacity()}" }
+
+ try {
+ if (requestedFrameCount == Long.MAX_VALUE) {
+ logger.trace { "Push based strategy" }
+ pushAvailableFrames()
+ } else {
+ logger.trace { "Pull based strategy - req $requestedFrameCount" }
+ pushUpToNumberOfFrames()
+ }
+ } catch (ex: Exception) {
+ sink.error(ex)
+ }
+
+ logger.trace { "Decoder buffer capacity after decoding frame: ${streamBuffer.capacity()}" }
+
+ }
+
+ private fun pushAvailableFrames() {
+ var nextFrame = decodeFirstFrameFromBuffer()
+ while (nextFrame != null && !sink.isCancelled) {
+ sink.next(nextFrame)
+ nextFrame = decodeFirstFrameFromBuffer()
+ }
+ sink.complete()
+ }
+
+ private fun pushUpToNumberOfFrames() {
+ var nextFrame = decodeFirstFrameFromBuffer()
+ var remaining = requestedFrameCount
+ loop@ while (nextFrame != null && !sink.isCancelled) {
+ sink.next(nextFrame)
+ if (--remaining > 0) {
+ nextFrame = decodeFirstFrameFromBuffer()
+ } else {
+ break@loop
+ }
+ }
+ if (remaining > 0 && nextFrame == null) {
+ sink.complete()
+ }
+ }
+
+ private fun decodeFirstFrameFromBuffer(): WireFrame? =
+ try {
+ WireFrame.decodeFirst(streamBuffer)
+ } catch (ex: MissingWireFrameBytesException) {
+ logger.debug { "${ex.message} - waiting for more data" }
+ null
+ }
+
+ companion object {
+ private val logger = Logger(WireFrameSink::class)
+ }
+}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
index 8d9e4962..263ad441 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -20,8 +20,10 @@
package org.onap.dcae.collectors.veshv.impl
import com.google.protobuf.ByteString
+import com.google.protobuf.InvalidProtocolBufferException
import io.netty.buffer.Unpooled.wrappedBuffer
import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
@@ -60,8 +62,9 @@ internal object VesDecoderTest : Spek({
on("invalid ves hv message bytes") {
val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
- it("should return empty result") {
- assertThat(cut.decode(rawMessageBytes)).isNull()
+ it("should throw error") {
+ assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
+ .isThrownBy { cut.decode(rawMessageBytes) }
}
}
}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
deleted file mode 100644
index 81706ce4..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
+++ /dev/null
@@ -1,104 +0,0 @@
-package org.onap.dcae.collectors.veshv.impl
-
-import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrame
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since May 2018
- */
-internal object WireDecoderTest : Spek({
- describe("decoding wire protocol") {
- val cut = WireDecoder()
-
- fun decode(frame: WireFrame) =
- cut.decode(
- frame.encode(UnpooledByteBufAllocator.DEFAULT))
-
- given("empty input") {
- val input = Unpooled.EMPTY_BUFFER
-
- it("should yield empty result") {
- assertThat(cut.decode(input)).isNull()
- }
- }
-
- given("input without 0xFF first byte") {
- val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
- mark = 0x10,
- majorVersion = 1,
- minorVersion = 2,
- payloadSize = 0)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with unsupported major version") {
- val input = WireFrame(
- payload = Unpooled.EMPTY_BUFFER,
- mark = 0xFF,
- majorVersion = 100,
- minorVersion = 2,
- payloadSize = 0)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with too small payload size") {
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = 1)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("input with too big payload size") {
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = 8)
-
- it("should yield empty result") {
- assertThat(decode(input)).isNull()
- }
- }
-
- given("valid input") {
- val payload = byteArrayOf(6, 9, 8, 6)
- val input = WireFrame(
- payload = Unpooled.wrappedBuffer(payload),
- mark = 0xFF,
- majorVersion = 1,
- minorVersion = 0,
- payloadSize = payload.size)
-
-
- it("should yield Google Protocol Buffers payload") {
- val result = decode(input)!!
-
- val actualPayload = ByteArray(result.readableBytes())
- result.readBytes(actualPayload)
-
- assertThat(actualPayload).containsExactly(*payload)
- }
- }
- }
-})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
new file mode 100644
index 00000000..0a10aa1f
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireDecoderTest.kt
@@ -0,0 +1,233 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import reactor.test.test
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireDecoderTest : Spek({
+ val alloc = UnpooledByteBufAllocator.DEFAULT
+ val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
+ val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
+
+ fun WireDecoder.decode(frame: WireFrame) = decode(frame.encode(alloc))
+
+ fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should be released: $bb ref count")
+ .isEqualTo(0)
+ }
+ }
+
+ fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should not be released: $bb ref count")
+ .isEqualTo(1)
+ }
+ }
+
+ describe("decoding wire protocol") {
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ WireDecoder().decode(input).test().verifyComplete()
+ }
+ }
+
+ given("input with no readable bytes") {
+ val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
+
+ it("should yield empty result") {
+ WireDecoder().decode(input).test().verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
+ given("invalid input (not starting with marker)") {
+ val input = Unpooled.wrappedBuffer(samplePayload)
+
+ it("should yield error") {
+ WireDecoder().decode(input).test()
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input") {
+ val input = WireFrame(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+ }
+
+ given("valid input with part of next frame") {
+ val input = Unpooled.buffer()
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc).slice(0, 3))
+
+ it("should yield decoded input frame") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeBytes(WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc))
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame and error") {
+ WireDecoder().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("two inputs containing two separate messages") {
+ val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing 1st frame and 2nd input containing garbage") {
+ val input1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val input2 = Unpooled.wrappedBuffer(anotherPayload)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1)
+ .doOnNext {
+ // releasing retained payload
+ it.payload.release()
+ }
+ .test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .verifyError(InvalidWireFrameMarkerException::class.java)
+ }
+
+ it("should release memory for 1st input") {
+ verifyMemoryReleased(input1)
+ }
+
+ it("should leave memory unreleased for 2nd input") {
+ verifyMemoryNotReleased(input2)
+ }
+ }
+
+
+ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
+ val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2, 3)
+ val input2 = Unpooled.buffer().writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
+ val frame1 = WireFrame(Unpooled.wrappedBuffer(samplePayload)).encode(alloc)
+ val frame2 = WireFrame(Unpooled.wrappedBuffer(anotherPayload)).encode(alloc)
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1, 5)
+ val input2 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = WireDecoder()
+ cut.decode(input1).test()
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+ }
+})
diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback-test.xml
index 809f62d4..84abc9d3 100644
--- a/hv-collector-core/src/test/resources/logback.xml
+++ b/hv-collector-core/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 1826bcd0..c4e9874f 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -20,12 +20,15 @@
package org.onap.dcae.collectors.veshv.tests.component
import io.netty.buffer.ByteBuf
+import io.netty.buffer.UnpooledByteBufAllocator
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
import org.onap.dcae.collectors.veshv.model.RoutedMessage
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.tests.fakes.FakeConfigurationProvider
import org.onap.dcae.collectors.veshv.tests.fakes.FakeSink
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.Exceptions
import reactor.core.publisher.Flux
import java.time.Duration
@@ -36,6 +39,7 @@ import java.time.Duration
internal class Sut {
val configurationProvider = FakeConfigurationProvider()
val sink = FakeSink()
+ val alloc = UnpooledByteBufAllocator.DEFAULT
private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink))
val collectorProvider = collectorFactory.createVesHvCollectorProvider()
@@ -43,8 +47,19 @@ internal class Sut {
get() = collectorProvider()
fun handleConnection(vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(Duration.ofSeconds(10))
-
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
return sink.sentMessages
}
+
+ fun handleConnectionReturningError(vararg packets: ByteBuf): Pair<List<RoutedMessage>, Exception?> =
+ try {
+ collector.handleConnection(alloc, Flux.fromArray(packets)).block(Duration.ofSeconds(10))
+ Pair(sink.sentMessages, null)
+ } catch (ex: Exception) {
+ Pair(sink.sentMessages, ex)
+ }
+
+ companion object {
+ val logger = Logger(Sut::class)
+ }
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 26032ff9..fc4fb656 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -19,9 +19,12 @@
*/
package org.onap.dcae.collectors.veshv.tests.component
+import com.google.protobuf.InvalidProtocolBufferException
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.WireFrameDecodingException
import org.onap.dcae.collectors.veshv.tests.fakes.HVRANMEAS_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
@@ -40,29 +43,76 @@ object VesHvSpecification : Spek({
.describedAs("should send all events")
.hasSize(2)
}
+ }
+
+ describe("Memory management") {
- system("should release memory for each incoming message") { sut ->
+ system("should release memory for each handled and dropped message") { sut ->
sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
val msgWithInvalidDomain = vesMessage(Domain.OTHER)
- val msgWithInvalidPayload = invalidVesMessage()
val msgWithInvalidFrame = invalidWireFrame()
- val validMessage = vesMessage(Domain.HVRANMEAS)
- val refCntBeforeSending = msgWithInvalidDomain.refCnt()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithInvalidDomain, msgWithInvalidFrame)
- sut.handleConnection(msgWithInvalidDomain, msgWithInvalidPayload, msgWithInvalidFrame, validMessage)
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception).isNull()
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidDomain.refCnt())
.describedAs("message with invalid domain should be released")
- .isEqualTo(refCntBeforeSending)
- assertThat(msgWithInvalidPayload.refCnt())
- .describedAs("message with invalid payload should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
assertThat(msgWithInvalidFrame.refCnt())
.describedAs("message with invalid frame should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ system("should release memory for each message with invalid payload") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val msgWithInvalidPayload = invalidVesMessage()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithInvalidPayload)
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception?.cause).isInstanceOf(InvalidProtocolBufferException::class.java)
+
+ assertThat(validMessage.refCnt())
+ .describedAs("handled message should be released")
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithInvalidPayload.refCnt())
+ .describedAs("message with invalid payload should be released")
+ .isEqualTo(expectedRefCnt)
+
+ }
+
+ system("should release memory for each message with garbage frame") { sut ->
+ sut.configurationProvider.updateConfiguration(basicConfiguration)
+ val validMessage = vesMessage(Domain.HVRANMEAS)
+ val msgWithGarbageFrame = garbageFrame()
+ val expectedRefCnt = 0
+
+ val (handledEvents, exception) = sut.handleConnectionReturningError(
+ validMessage, msgWithGarbageFrame)
+
+ assertThat(handledEvents).hasSize(1)
+ assertThat(exception?.cause)
+ .isInstanceOf(InvalidWireFrameMarkerException::class.java)
+
assertThat(validMessage.refCnt())
.describedAs("handled message should be released")
- .isEqualTo(refCntBeforeSending)
+ .isEqualTo(expectedRefCnt)
+ assertThat(msgWithGarbageFrame.refCnt())
+ .describedAs("message with garbage frame should be released")
+ .isEqualTo(expectedRefCnt)
+
}
}
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
index b6342b11..998f3140 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/utils.kt
@@ -54,6 +54,10 @@ fun invalidVesMessage() = alocator.buffer().run {
}
+fun garbageFrame() = alocator.buffer().run {
+ writeBytes("the meaning of life is &@)(*_!".toByteArray())
+}
+
fun invalidWireFrame() = alocator.buffer().run {
writeByte(0xFF)
writeByte(1)
@@ -65,6 +69,7 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
.setCommonEventHeader(
CommonEventHeader.getDefaultInstance().toBuilder()
.setVersion("1.0")
+ .setEventName("xyz")
.setEventId(id)
.setDomain(domain)
.setEventName("Sample event name")
@@ -76,6 +81,3 @@ fun vesEvent(domain: Domain = Domain.OTHER, id: String = UUID.randomUUID().toStr
.setSequence(1))
.setHvRanMeasFields(ByteString.EMPTY)
.build()
-
-
-
diff --git a/hv-collector-ct/src/test/resources/logback-test.xml b/hv-collector-ct/src/test/resources/logback-test.xml
index 809f62d4..84abc9d3 100644
--- a/hv-collector-ct/src/test/resources/logback-test.xml
+++ b/hv-collector-ct/src/test/resources/logback-test.xml
@@ -26,7 +26,7 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
index 5bd63d8b..8c8b4718 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -21,6 +21,9 @@ package org.onap.dcae.collectors.veshv.domain
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.exceptions.EmptyWireFrameException
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
/**
* Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
@@ -53,20 +56,20 @@ import io.netty.buffer.ByteBufAllocator
* @since May 2018
*/
data class WireFrame(val payload: ByteBuf,
- val mark: Short,
val majorVersion: Short,
val minorVersion: Short,
val payloadSize: Int) {
+ constructor(payload: ByteBuf) : this(payload, 1, 0, payload.readableBytes())
+
fun isValid(): Boolean =
- mark == FF_BYTE
- && majorVersion == SUPPORTED_MAJOR_VERSION
+ majorVersion == SUPPORTED_MAJOR_VERSION
&& payload.readableBytes() == payloadSize
fun encode(allocator: ByteBufAllocator): ByteBuf {
val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
- bb.writeByte(mark.toInt())
+ bb.writeByte(MARKER_BYTE.toInt())
bb.writeByte(majorVersion.toInt())
bb.writeByte(minorVersion.toInt())
bb.writeInt(payloadSize)
@@ -76,20 +79,58 @@ data class WireFrame(val payload: ByteBuf,
}
companion object {
- fun decode(byteBuf: ByteBuf): WireFrame {
- val mark = byteBuf.readUnsignedByte()
+ fun decodeFirst(byteBuf: ByteBuf): WireFrame {
+ verifyNotEmpty(byteBuf)
+ byteBuf.markReaderIndex()
+
+ verifyMarker(byteBuf)
+ verifyMinimumSize(byteBuf)
+
val majorVersion = byteBuf.readUnsignedByte()
val minorVersion = byteBuf.readUnsignedByte()
- val payloadSize = byteBuf.readInt()
- val payload = byteBuf.retainedSlice()
+ val payloadSize = verifyPayloadSize(byteBuf)
+
+ val payload = byteBuf.retainedSlice(byteBuf.readerIndex(), payloadSize)
+ byteBuf.readerIndex(byteBuf.readerIndex() + payloadSize)
+
+ return WireFrame(payload, majorVersion, minorVersion, payloadSize)
+ }
+
+ private fun verifyPayloadSize(byteBuf: ByteBuf): Int =
+ byteBuf.readInt().let { payloadSize ->
+ if (byteBuf.readableBytes() < payloadSize) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < payload size")
+ } else {
+ payloadSize
+ }
+ }
+
+ private fun verifyMinimumSize(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < HEADER_SIZE) {
+ byteBuf.resetReaderIndex()
+ throw MissingWireFrameBytesException("readable bytes < header size")
+ }
+ }
+
+ private fun verifyMarker(byteBuf: ByteBuf) {
+ val mark = byteBuf.readUnsignedByte()
+ if (mark != MARKER_BYTE) {
+ byteBuf.resetReaderIndex()
+ throw InvalidWireFrameMarkerException(mark)
+ }
+ }
- return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
+ private fun verifyNotEmpty(byteBuf: ByteBuf) {
+ if (byteBuf.readableBytes() < 1) {
+ throw EmptyWireFrameException()
+ }
}
- private const val HEADER_SIZE =
+ const val HEADER_SIZE =
3 * java.lang.Byte.BYTES +
- 1 * java.lang.Integer.BYTES
- private const val FF_BYTE: Short = 0xFF
- private const val SUPPORTED_MAJOR_VERSION: Short = 1
+ 1 * java.lang.Integer.BYTES
+ const val MARKER_BYTE: Short = 0xFF
+ const val SUPPORTED_MAJOR_VERSION: Short = 1
}
}
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
new file mode 100644
index 00000000..6e1ce935
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/EmptyWireFrameException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class EmptyWireFrameException : MissingWireFrameBytesException("wire frame was empty (readable bytes == 0)")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt
index 6f6ac2a7..ff452a7a 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/InvalidWireFrameMarkerException.kt
@@ -17,28 +17,13 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl
+package org.onap.dcae.collectors.veshv.domain.exceptions
-import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.domain.WireFrame
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
+ * @since June 2018
*/
-internal class WireDecoder {
- fun decode(byteBuf: ByteBuf): ByteBuf? =
- try {
- WireFrame.decode(byteBuf)
- .takeIf { it.isValid() }
- .let { it?.payload }
- } catch (ex: IndexOutOfBoundsException) {
- logger.debug { "Wire protocol frame could not be decoded - input is too small" }
- null
- }
-
- companion object {
- private val logger = Logger(WireDecoder::class)
- }
-}
+class InvalidWireFrameMarkerException(actualMarker: Short) : WireFrameDecodingException(
+ "Invalid start of frame. Expected 0x%02X, but was 0x%02X".format(WireFrame.MARKER_BYTE, actualMarker))
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
new file mode 100644
index 00000000..7e4b3cef
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/MissingWireFrameBytesException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class MissingWireFrameBytesException(msg: String) : WireFrameDecodingException(msg)
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
new file mode 100644
index 00000000..11013834
--- /dev/null
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/exceptions/WireFrameDecodingException.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain.exceptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+open class WireFrameDecodingException(msg: String) : Exception(msg)
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
index 5a923c4e..00113267 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameTest.kt
@@ -1,29 +1,113 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
package org.onap.dcae.collectors.veshv.domain
-import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.assertThatExceptionOfType
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.exceptions.InvalidWireFrameMarkerException
+import org.onap.dcae.collectors.veshv.domain.exceptions.MissingWireFrameBytesException
+import java.nio.charset.Charset
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
object WireFrameTest : Spek({
- describe("Wire Frame codec") {
- describe("encode-decode methods' compatibility") {
- val payloadContent = "test"
- val payload = Unpooled.wrappedBuffer(payloadContent.toByteArray(Charsets.US_ASCII))
- val frame = WireFrame(payload = payload,
- majorVersion = 1,
+ val payloadAsString = "coffeebabe"
+
+ fun createSampleFrame() =
+ WireFrame(Unpooled.wrappedBuffer(payloadAsString.toByteArray(Charset.defaultCharset())))
+
+ fun encodeSampleFrame() =
+ createSampleFrame().let {
+ Unpooled.buffer()
+ .writeBytes(it.encode(UnpooledByteBufAllocator.DEFAULT))
+
+ }
+
+ describe("Wire Frame invariants") {
+
+ given("input with unsupported major version") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ majorVersion = 100,
minorVersion = 2,
- mark = 0xFF,
- payloadSize = payload.readableBytes())
+ payloadSize = 0)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too small payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 1)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("input with too big payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2, 3)),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 8)
+
+ it("should fail validation") {
+ assertThat(input.isValid()).isFalse()
+ }
+ }
+
+ given("valid input") {
+ val payload = byteArrayOf(6, 9, 8, 6)
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(payload),
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = payload.size)
+
+ it("should pass validation") {
+ assertThat(input.isValid()).isTrue()
+ }
+ }
- val encoded = frame.encode(ByteBufAllocator.DEFAULT)
- val decoded = WireFrame.decode(encoded)
+
+ }
+
+ describe("Wire Frame codec") {
+
+ describe("encode-decode methods' compatibility") {
+ val frame = createSampleFrame()
+ val encoded = encodeSampleFrame()
+ val decoded = WireFrame.decodeFirst(encoded)
it("should decode major version") {
assertThat(decoded.majorVersion).isEqualTo(frame.majorVersion)
@@ -33,17 +117,13 @@ object WireFrameTest : Spek({
assertThat(decoded.minorVersion).isEqualTo(frame.minorVersion)
}
- it("should decode mark") {
- assertThat(decoded.mark).isEqualTo(frame.mark)
- }
-
it("should decode payload size") {
assertThat(decoded.payloadSize).isEqualTo(frame.payloadSize)
}
it("should decode payload") {
- assertThat(decoded.payload.toString(Charsets.US_ASCII))
- .isEqualTo(payloadContent)
+ assertThat(decoded.payload.toString(Charset.defaultCharset()))
+ .isEqualTo(payloadAsString)
}
it("should retain decoded payload") {
@@ -51,5 +131,55 @@ object WireFrameTest : Spek({
assertThat(decoded.payload.refCnt()).isEqualTo(1)
}
}
+
+ describe("TCP framing") {
+ // see "Dealing with a Stream-based Transport" on http://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-11
+
+ it("should decode message leaving rest unread") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ .writeByte(0xAA)
+ val decoded = WireFrame.decodeFirst(buff)
+
+ assertThat(decoded.isValid()).describedAs("should be valid").isTrue()
+ assertThat(buff.readableBytes()).isEqualTo(1)
+ }
+
+ it("should throw exception when not even header fits") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xFF)
+
+ assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when first byte is not 0xFF but length looks ok") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xAA)
+ .writeBytes("some garbage".toByteArray())
+
+ assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when first byte is not 0xFF and length is to short") {
+ val buff = Unpooled.buffer()
+ .writeByte(0xAA)
+
+ assertThatExceptionOfType(InvalidWireFrameMarkerException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ it("should throw exception when payload doesn't fit") {
+ val buff = Unpooled.buffer()
+ .writeBytes(encodeSampleFrame())
+ buff.writerIndex(buff.writerIndex() - 2)
+
+ assertThatExceptionOfType(MissingWireFrameBytesException::class.java)
+ .isThrownBy { WireFrame.decodeFirst(buff) }
+ }
+
+ }
}
+
}) \ No newline at end of file
diff --git a/hv-collector-main/Dockerfile b/hv-collector-main/Dockerfile
index ceb45ead..1367ff1c 100644
--- a/hv-collector-main/Dockerfile
+++ b/hv-collector-main/Dockerfile
@@ -12,4 +12,4 @@ ENTRYPOINT ["java", "-cp", "*:", "org.onap.dcae.collectors.veshv.main.MainKt"]
CMD ["--listen-port", "6061"]
COPY target/libs/external/* ./
COPY target/libs/internal/* ./
-COPY target/hv-collector-main-*.jar ./ \ No newline at end of file
+COPY target/hv-collector-main-*.jar ./
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 89b31b59..4438cf38 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -39,7 +39,7 @@ fun main(args: Array<String>) {
val collectorProvider = CollectorFactory(
resolveConfigurationProvider(serverConfiguration),
- AdapterFactory.kafkaSink()
+ AdapterFactory.loggingSink()
).createVesHvCollectorProvider()
ServerFactory.createNettyTcpServer(serverConfiguration, collectorProvider).start().block()
} catch (ex: WrongArgumentException) {
diff --git a/hv-collector-main/src/main/resources/logback.xml b/hv-collector-main/src/main/resources/logback.xml
index 809f62d4..48da3b18 100644
--- a/hv-collector-main/src/main/resources/logback.xml
+++ b/hv-collector-main/src/main/resources/logback.xml
@@ -26,7 +26,8 @@
</rollingPolicy>
</appender>
- <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+ <logger name="org.onap.dcae.collectors.veshv" level="INFO"/>
+ <!--<logger name="reactor.ipc.netty" level="DEBUG"/>-->
<root level="INFO">
<appender-ref ref="CONSOLE"/>
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
index b96a8b3a..eb52a866 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt
@@ -26,6 +26,17 @@ class Logger(val logger: org.slf4j.Logger) {
constructor(clazz: KClass<out Any>) : this(LoggerFactory.getLogger(clazz.java))
//
+ // TRACE
+ //
+
+ fun trace(messageProvider: () -> String) {
+ if (logger.isTraceEnabled) {
+ logger.trace(messageProvider())
+ }
+ }
+
+
+ //
// DEBUG
//