summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-04-26 09:17:09 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-07-26 09:18:00 +0200
commite98fdcc3087d06b76066ae2d2c7d0bde41d7776b (patch)
tree3f6b79be2422022233b7e2f6c51064a63cba5fe1 /hv-collector-core
parentdcbb6333fede6c0cf43ac8690119911b01864d8d (diff)
HV VES Collector seed code
Contains squashed commits up to 11fe6b63 (2018-05-30). The whole contains a basic project structure. We are trying to put rest of the commits one by one so we do not loose the history. Bellow there are messages of the single commits in this squashed bulk: Basic project setup Create base maven project with Gitlab CI configuration. Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Merging guildeline Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Add remote branch delete command Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Sample runtime in Kotlin - PoC Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Setup project internal architecture Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Message routing Determine target topic and partition by VES Common Header. Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Parse GPB message header fkrzywka <filip.krzywka@nokia.com> Set listen port based on command line args Use Apache Commons CLI to parse cmd line args. Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Drop invalid GPB messages Instead of propagating error and closing stream just drop the message and proceed. Final handling logic may include closing the connection or sending some message depending on the specification. Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Add Apache license file Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Convert to maven multi-module project fkrzywka <filip.krzywka@nokia.com> Component tests with current GPB schema * Using v5 draft protobuf definition * Code reorganized to so component boundaries are more visible Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Thin logging facade over slf4j Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Introduce code analysis tools Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Implemented reading configuration from consul Ves Common Header validation added (required parameters existance check) Micro benchmark for direct vs on-heap NIO buffers Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Decode wire protocol and fix (most?) memory leaks Proposed wire protocol is just a suggestion and will (should) change in the future. Netty's ByteBuf is a reference-counted wrapper over a memory chunk. It is crucial to free unused buffers by means of release() method. The general rule regarding memory management was suggested. Let's put all memory-cleanup logic in main VesHvCollector class so other classes could focus on their job. Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Minor cleanup Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Add license info in files Piotr Jaszczyk <piotr.jaszczyk@nokia.com> Change-Id: Ic484aa107eba48ad48f8ab222799e1795dffa865 Issue-ID: DCAEGEN2-601 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml142
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt53
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt39
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt93
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt82
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt59
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt33
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt48
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt66
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt50
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt83
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt44
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt65
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt63
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt68
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt108
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt92
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt69
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt104
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt60
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt60
-rw-r--r--hv-collector-core/src/test/resources/logback.xml35
28 files changed, 1657 insertions, 0 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
new file mode 100644
index 00000000..ed501a44
--- /dev/null
+++ b/hv-collector-core/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+ <artifactId>ves-hv-collector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-core</artifactId>
+ <description>VES HighVolume Collector :: Core</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>protobuf</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.nhaarman</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
new file mode 100644
index 00000000..d4de1b5b
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+
+interface Sink {
+ fun send(messages: Flux<RoutedMessage>): Flux<VesMessage>
+}
+
+@FunctionalInterface
+interface SinkProvider {
+ operator fun invoke(config: CollectorConfiguration): Sink
+
+ companion object {
+ fun just(sink: Sink): SinkProvider =
+ object : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink = sink
+ }
+ }
+}
+
+interface ConfigurationProvider {
+ operator fun invoke(): Flux<CollectorConfiguration>
+
+ companion object {
+ fun from(function: () -> Flux<CollectorConfiguration>): ConfigurationProvider =
+ object : ConfigurationProvider {
+ override fun invoke(): Flux<CollectorConfiguration> = function()
+ }
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
new file mode 100644
index 00000000..809ba32f
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+interface Collector {
+ fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
+}
+
+typealias CollectorProvider = () -> Collector
+
+interface Server {
+ fun start(): Mono<Void>
+}
+
+interface ServerFactory {
+ fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt
new file mode 100644
index 00000000..f5e525e7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
new file mode 100644
index 00000000..2cf2b082
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
new file mode 100644
index 00000000..ea78706e
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class ServerConfiguration(val port: Int, val configurationUrl: String)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt
new file mode 100644
index 00000000..79b48804
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
new file mode 100644
index 00000000..306b7762
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+
+/**
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┐
+ * │octet│ 0 │ 1 │ 2 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │ 8│ │ │ │ │ │ │ │16│ │ │ │ │ │ │ │ ...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xFF │ major version │ minor version │
+ * └─────┴───────────────────────┴───────────────────────┴───────────────────────┘
+ * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐
+ * │octet│ 3 │ 4 │ 5 │ 6 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * ... │ bit │24│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ payload size │
+ * └─────┴───────────────────────────────────────────────────────────────────────────────────────────────┘
+ * ┌─────┬───────────────────────
+ * │octet│ 7 ...
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──
+ * ... │ bit │56│ │ │ │ │ │ │...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──
+ * │field│ protobuf payload
+ * └─────┴───────────────────────
+ * ```
+ *
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class WireFrame(val payload: ByteBuf,
+ val mark: Short,
+ val majorVersion: Short,
+ val minorVersion: Short,
+ val payloadSize: Int) {
+ fun isValid(): Boolean {
+ return mark == FF_BYTE
+ && majorVersion == SUPPORTED_MAJOR_VERSION
+ && payload.readableBytes() == payloadSize
+ }
+
+ fun encode(allocator: ByteBufAllocator): ByteBuf {
+ val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
+
+ bb.writeByte(mark.toInt())
+ bb.writeByte(majorVersion.toInt())
+ bb.writeByte(minorVersion.toInt())
+ bb.writeInt(payloadSize)
+ bb.writeBytes(payload)
+
+ return bb
+ }
+
+ companion object {
+ fun decode(byteBuf: ByteBuf): WireFrame {
+ val mark = byteBuf.readUnsignedByte()
+ val majorVersion = byteBuf.readUnsignedByte()
+ val minorVersion = byteBuf.readUnsignedByte()
+ val payloadSize = byteBuf.readInt()
+ val payload = byteBuf.slice()
+
+ return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
+ }
+
+ private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES
+ private const val FF_BYTE: Short = 0xFF
+ private const val SUPPORTED_MAJOR_VERSION: Short = 1
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt
new file mode 100644
index 00000000..3f826f7c
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+data class Routing(val routes: List<Route>) {
+
+ fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+}
+
+data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+
+ fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
+
+ operator fun invoke(message: VesMessage): RoutedMessage =
+ RoutedMessage(targetTopic, partitioning(message.header), message)
+}
+
+
+/*
+Configuration DSL
+ */
+
+fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
+ val conf = RoutingBuilder()
+ conf.init()
+ return conf
+}
+
+class RoutingBuilder {
+ private val routes: MutableList<RouteBuilder> = mutableListOf()
+
+ fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
+ val rule = RouteBuilder()
+ rule.init()
+ routes.add(rule)
+ return rule
+ }
+
+ fun build() = Routing(routes.map { it.build() }.toList())
+}
+
+class RouteBuilder {
+
+ private lateinit var domain: Domain
+ private lateinit var targetTopic: String
+ private lateinit var partitioning: (CommonEventHeader) -> Int
+
+ fun fromDomain(domain: Domain) {
+ this.domain = domain
+ }
+
+ fun toTopic(targetTopic: String) {
+ this.targetTopic = targetTopic
+ }
+
+ fun withFixedPartitioning(num: Int = 1) {
+ partitioning = { _ -> num }
+ }
+
+ fun build() = Route(domain, targetTopic, partitioning)
+
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
new file mode 100644
index 00000000..cb018cca
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.impl.MessageValidator
+import org.onap.dcae.collectors.veshv.impl.Router
+import org.onap.dcae.collectors.veshv.impl.VesDecoder
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.WireDecoder
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) {
+ fun createVesHvCollectorProvider(): CollectorProvider {
+ val collector: AtomicReference<Collector> = AtomicReference()
+ createVesHvCollector().subscribe(collector::set)
+ return collector::get
+ }
+
+ private fun createVesHvCollector(): Flux<Collector> =
+ configuration().map(this::createVesHvCollector)
+
+ private fun createVesHvCollector(config: CollectorConfiguration): Collector {
+ return VesHvCollector(
+ WireDecoder(),
+ VesDecoder(),
+ MessageValidator(),
+ Router(config.routing),
+ sinkProvider(config))
+ }
+
+}
+
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
new file mode 100644
index 00000000..5e60fa56
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import org.onap.dcae.collectors.veshv.impl.NettyTcpServer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ServerFactory {
+ val createNettyTcpServer: (ServerConfiguration, CollectorProvider) -> Server = ::NettyTcpServer
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
new file mode 100644
index 00000000..55e65cf7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+internal class MessageValidator {
+
+ val requiredFieldDescriptors = listOf(
+ "version",
+ "eventName",
+ "domain",
+ "eventId",
+ "sourceName",
+ "reportingEntityName",
+ "priority",
+ "startEpochMicrosec",
+ "lastEpochMicrosec",
+ "sequence")
+ .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
+
+ fun isValid(message: VesMessage): Boolean {
+ val header = message.header
+ return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ }
+
+ private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
+ requiredFieldDescriptors
+ .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
new file mode 100644
index 00000000..ca77df2a
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.NettyInbound
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpServer
+import java.util.function.BiFunction
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class NettyTcpServer(val serverConfig: ServerConfiguration,
+ val collectorProvider: CollectorProvider) : Server {
+
+ override fun start(): Mono<Void> {
+ logger.info { "Listening on port ${serverConfig.port}" }
+ return Mono.defer {
+ val nettyContext = TcpServer.create(serverConfig.port)
+ .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
+ handleConnection(t, u)
+ })
+ Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
+ }
+ }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ logger.debug("Got connection")
+ val pipe = collectorProvider().handleConnection(nettyInbound.receive())
+
+ val hello = nettyOutbound
+ .options { it.flushOnEach() }
+ .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
+ .then()
+
+ return hello.then(pipe)
+ }
+
+ companion object {
+ private val logger = Logger(NettyTcpServer::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
new file mode 100644
index 00000000..8c005d42
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -0,0 +1,9 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.Routing
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+
+class Router(private val routing: Routing) {
+ fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
new file mode 100644
index 00000000..6ace06e4
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.InvalidProtocolBufferException
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesDecoder {
+
+ fun decode(bb: ByteBuf): VesMessage? =
+ try {
+ val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
+ VesMessage(decodedHeader, bb)
+ } catch (ex: InvalidProtocolBufferException) {
+ logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" }
+ logger.debug("Cause", ex)
+ null
+ }
+
+
+ companion object {
+ private val logger = Logger(VesDecoder::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
new file mode 100644
index 00000000..af9d0b0a
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesHvCollector(
+ val wireDecoder: WireDecoder,
+ val protobufDecoder: VesDecoder,
+ val validator: MessageValidator,
+ val router: Router,
+ val sink: Sink) : Collector {
+ override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+ dataStream
+ .flatMap(this::decodeWire)
+ .flatMap(this::decodeProtobuf)
+ .filter(this::validate)
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext(this::releaseMemory)
+ .then()
+
+ private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode)
+
+ private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
+
+ private fun validate(msg: VesMessage): Boolean {
+ val valid = validator.isValid(msg)
+ if (!valid) {
+ msg.rawMessage.release()
+ }
+ return valid
+ }
+
+ private fun findRoute(msg: VesMessage): Mono<RoutedMessage> {
+ val routedMessage = router.findDestination(msg)
+ return if (routedMessage == null)
+ Mono.empty()
+ else
+ Mono.just(routedMessage)
+ }
+
+ private fun releaseMemory(msg: VesMessage) {
+ msg.rawMessage.release()
+ }
+
+ private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
+ val result = mapper(input)
+ return if (result == null) {
+ input.release()
+ Mono.empty()
+ } else {
+ Mono.just(result)
+ }
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
new file mode 100644
index 00000000..66ced2d7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireDecoder {
+ fun decode(byteBuf: ByteBuf): ByteBuf? =
+ try {
+ WireFrame.decode(byteBuf)
+ .takeIf { it.isValid() }
+ .let { it?.payload }
+ } catch (ex: IndexOutOfBoundsException) {
+ logger.debug { "Wire protocol frame could not be decoded - input is too small" }
+ null
+ }
+
+ companion object {
+ private val logger = Logger(WireDecoder::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
new file mode 100644
index 00000000..cdfcfd30
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.common.serialization.ByteBufferSerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+
+ fun staticConfigurationProvider(config: CollectorConfiguration) =
+ object : ConfigurationProvider {
+ override fun invoke() = Flux.just(config)
+ }
+
+ private class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ val sender = KafkaSender.create(
+ SenderOptions.create<CommonEventHeader, ByteBuffer>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java))
+ return KafkaSink(sender)
+ }
+ }
+
+ fun consulConfigurationProvider(url: String): ConfigurationProvider =
+ ConsulConfigurationProvider(url, httpAdapter())
+ fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+}
+
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
new file mode 100644
index 00000000..ef6c2f76
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -0,0 +1,63 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Flux
+import java.io.StringReader
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.atomic.AtomicReference
+import javax.json.Json
+import javax.json.JsonObject
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
+ : ConfigurationProvider {
+
+
+ private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+ private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
+ .flatMap { http.getResponse(url) }
+ .filter { body -> body.hashCode() != lastConfigurationHash.get() }
+ .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
+ .map { str -> getConfigurationJson(str) }
+ .map { json -> createCollectorConfiguration(json) }
+
+ private fun getConfigurationJson(str: String): JsonObject {
+ val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
+ val decodedValue = String(
+ Base64.getDecoder().decode(response.getString("Value")))
+ logger.info("Obtained new configuration from consul:\n$decodedValue")
+ return Json.createReader(StringReader(decodedValue)).readObject()
+ }
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+
+ val routing = configuration.getJsonObject("routing")
+
+ return CollectorConfiguration(
+ kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
+ routing = org.onap.dcae.collectors.veshv.domain.routing {
+ defineRoute {
+ fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
+ toTopic(routing.getString("toTopic"))
+ withFixedPartitioning()
+ }
+ }.build()
+ )
+ }
+
+ companion object {
+ private const val REFRESH_INTERVAL_MINUTES: Long = 5
+ private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
new file mode 100644
index 00000000..c09af015
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -0,0 +1,29 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.ipc.netty.http.client.HttpClientResponse
+import java.nio.charset.Charset
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+open class HttpAdapter(private val httpClient: HttpClient) {
+
+ private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
+
+ open fun getResponse(url: String): Mono<String> =
+ httpClient.get(url)
+ .onErrorResume { e -> unableToGetResource(e, url) }
+ .flatMap { res -> res.receiveContent().toMono() }
+ .map { content -> content.content().toString(Charset.defaultCharset()) }
+
+
+ private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> {
+ logger.info("Failed to get resource on path: $url\n${e.localizedMessage}")
+ return Mono.empty()
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt
new file mode 100644
index 00000000..8b558a85
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderRecord
+import reactor.kafka.sender.SenderResult
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink {
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+ val records = messages.map(this::vesToKafkaRecord)
+ return sender.send(records)
+ .doOnNext(::logException)
+ .filter(::isSuccessful)
+ .map { it.correlationMetadata() }
+ }
+
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> {
+ return SenderRecord.create(
+ msg.topic,
+ msg.partition,
+ System.currentTimeMillis(),
+ msg.message.header,
+ msg.message.rawMessage.nioBuffer(),
+ msg.message)
+ }
+
+ private fun logException(senderResult: SenderResult<out Any>) {
+ if (senderResult.exception() != null) {
+ logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+
+ companion object {
+ val logger = Logger(KafkaSink::class)
+ }
+}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
new file mode 100644
index 00000000..8f6b76fb
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.Unpooled.wrappedBuffer
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent
+import org.assertj.core.api.Assertions.assertThat
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.*
+
+internal object MessageValidatorTest : Spek({
+
+ fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf {
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
+ .build()
+ return wrappedBuffer(msg.toByteArray())
+ }
+
+ given("Message validator") {
+ val cut = MessageValidator()
+
+ on("ves hv message including header with fully initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setDomain(Domain.HVRANMEAS)
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
+ .setPriority(Priority.MEDIUM)
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034459)
+ .setSequence(2)
+ .build()
+
+ it("should accept message with fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, vesMessageBytes(commonHeader))
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+ }
+
+ it("should reject message with domain other than HVRANMEAS") {
+ Domain.values()
+ .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED }
+ .forEach { domain ->
+ val header = newBuilder(commonHeader).setDomain(domain).build()
+ val vesMessage = VesMessage(header, vesMessageBytes(header))
+ assertThat(cut.isValid(vesMessage))
+ .describedAs("message with $domain domain")
+ .isFalse()
+ }
+ }
+ }
+
+ on("ves hv message bytes") {
+ val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER)
+ it("should not accept message with default header") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+
+ on("ves hv message including header with not initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .build()
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
+ .build()
+ val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+
+ it("should not accept not fully initialized message header ") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
new file mode 100644
index 00000000..ac91aaeb
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -0,0 +1,92 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.Unpooled
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.domain.routing
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object RouterTest : Spek({
+ given("sample configuration") {
+ val config = routing {
+
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic("ves_rtpm")
+ withFixedPartitioning(2)
+ }
+
+ defineRoute {
+ fromDomain(Domain.SYSLOG)
+ toTopic("ves_trace")
+ withFixedPartitioning()
+ }
+ }.build()
+ val cut = Router(config)
+
+ on("message with existing route (rtpm)") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result?.partition).isEqualTo(2)
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result?.topic).isEqualTo("ves_rtpm")
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result?.message).isSameAs(message)
+ }
+ }
+
+ on("message with existing route (trace)") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result?.partition).isEqualTo(1)
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result?.topic).isEqualTo("ves_trace")
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result?.message).isSameAs(message)
+ }
+ }
+
+ on("message with unknown route") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should not have route available") {
+ assertThat(result).isNull()
+ }
+ }
+ }
+})
+
+private fun vesCommonHeaderWithDomain(domain: Domain) =
+ CommonEventHeader.getDefaultInstance().toBuilder()
+ .setDomain(domain)
+ .build() \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
new file mode 100644
index 00000000..4d9c9239
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.Unpooled.wrappedBuffer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.test.StepVerifier
+import java.nio.charset.Charset
+
+
+internal object VesDecoderTest : Spek({
+
+ given("ves message decoder") {
+ val cut = VesDecoder()
+
+ on("ves hv message bytes") {
+ val commonHeader = CommonEventHeader.getDefaultInstance()
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
+ .build()
+ val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+
+
+ it("should decode only header and pass it on along with raw message") {
+ val expectedMessage = VesMessage(
+ commonHeader,
+ rawMessageBytes
+ )
+
+ assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage)
+
+ }
+ }
+
+ on("invalid ves hv message bytes") {
+ val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
+
+ it("should return empty result") {
+ assertThat(cut.decode(rawMessageBytes)).isNull()
+ }
+ }
+ }
+})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
new file mode 100644
index 00000000..3563bf6d
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
@@ -0,0 +1,104 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireDecoderTest : Spek({
+ describe("decoding wire protocol") {
+ val cut = WireDecoder()
+
+ fun decode(frame: WireFrame) =
+ cut.decode(
+ frame.encode(UnpooledByteBufAllocator.DEFAULT))
+
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ assertThat(cut.decode(input)).isNull()
+ }
+ }
+
+ given("input without 0xFF first byte") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ mark = 0x10,
+ majorVersion = 1,
+ minorVersion = 2,
+ payloadSize = 0)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with unsupported major version") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ mark = 0xFF,
+ majorVersion = 100,
+ minorVersion = 2,
+ payloadSize = 0)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with too small payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 1)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with too big payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 8)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("valid input") {
+ val payload = byteArrayOf(6, 9, 8, 6)
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(payload),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = payload.size)
+
+
+ it("should yield Google Protocol Buffers payload") {
+ val result = decode(input)!!
+
+ val actualPayload = ByteArray(result.readableBytes())
+ result.readBytes(actualPayload)
+
+ assertThat(actualPayload).containsExactly(*payload)
+ }
+ }
+ }
+})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
new file mode 100644
index 00000000..bf65c859
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -0,0 +1,60 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Mono
+import java.util.*
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object ConsulConfigurationProviderTest : Spek({
+
+ given("valid resource url") {
+ val testUrl = "http://valid-url/"
+ val httpAdapterMock: HttpAdapter = mock()
+ val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock)
+
+ whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse()))
+
+
+ it("should create valid collector configuration") {
+ val response = consulConfigProvider().blockFirst()
+ assertEquals("val1", response.kafkaBootstrapServers)
+ val route = response.routing.routes[0]
+ assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain)
+ assertEquals("val3", route.targetTopic)
+ }
+ }
+})
+
+fun constructConsulResponse(): String {
+
+ val config = """{
+ "kafkaBootstrapServers": "val1",
+ "routing": {
+ "fromDomain": 2,
+ "toTopic": "val3"
+ }
+ }"""
+
+ val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
+
+ return """[
+ {
+ "CreateIndex": 100,
+ "ModifyIndex": 200,
+ "LockIndex": 200,
+ "Key": "zip",
+ "Flags": 0,
+ "Value": "$encodedValue",
+ "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
+ }
+ ]"""
+}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
new file mode 100644
index 00000000..10e8b6bf
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -0,0 +1,60 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import io.netty.buffer.Unpooled
+import io.netty.handler.codec.http.HttpContent
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.ipc.netty.http.client.HttpClientResponse
+import java.nio.charset.Charset
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object HttpAdapterTest : Spek({
+
+ given("valid resource url") {
+
+ val httpClientMock: HttpClient = mock()
+ val httpAdapter = HttpAdapter(httpClientMock)
+ val testUrl = "http://valid-url/"
+ val responseContent = """{"key1": "value1", "key2": "value2"}"""
+ val httpResponse = createHttpResponseMock(responseContent)
+ whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse))
+
+ it("should return response string") {
+ assertEquals(responseContent, httpAdapter.getResponse(testUrl).block())
+ }
+ }
+
+ given("invalid resource url") {
+
+ val httpClientMock: HttpClient = mock()
+ val httpAdapter = HttpAdapter(httpClientMock)
+ val testUrl = "http://invalid-url/"
+ whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception")))
+
+
+ it("should return null response") {
+ assertEquals(null, httpAdapter.getResponse(testUrl).block())
+ }
+ }
+})
+
+fun createHttpResponseMock(content: String): HttpClientResponse {
+ val responseMock: HttpClientResponse = mock()
+ val contentMock: HttpContent = mock()
+ val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset())
+
+ whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock))
+ whenever(contentMock.content()).thenReturn(contentByteBuff)
+
+ return responseMock
+}
diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback.xml
new file mode 100644
index 00000000..809f62d4
--- /dev/null
+++ b/hv-collector-core/src/test/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file