summaryrefslogtreecommitdiffstats
path: root/hv-collector-core
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-core')
-rw-r--r--hv-collector-core/pom.xml142
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt53
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt39
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt93
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt82
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt59
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt33
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt48
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt66
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt9
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt50
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt83
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt44
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt65
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt63
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt68
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt108
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt92
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt69
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt104
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt60
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt60
-rw-r--r--hv-collector-core/src/test/resources/logback.xml35
28 files changed, 1657 insertions, 0 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml
new file mode 100644
index 00000000..ed501a44
--- /dev/null
+++ b/hv-collector-core/pom.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.veshv</groupId>
+ <artifactId>ves-hv-collector</artifactId>
+ <version>1.0.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-core</artifactId>
+ <description>VES HighVolume Collector :: Core</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>protobuf</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.ipc</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+
+
+ <dependency>
+ <groupId>com.nhaarman</groupId>
+ <artifactId>mockito-kotlin</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jetbrains.spek</groupId>
+ <artifactId>spek-junit-platform-engine</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
new file mode 100644
index 00000000..d4de1b5b
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -0,0 +1,53 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+
+interface Sink {
+ fun send(messages: Flux<RoutedMessage>): Flux<VesMessage>
+}
+
+@FunctionalInterface
+interface SinkProvider {
+ operator fun invoke(config: CollectorConfiguration): Sink
+
+ companion object {
+ fun just(sink: Sink): SinkProvider =
+ object : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink = sink
+ }
+ }
+}
+
+interface ConfigurationProvider {
+ operator fun invoke(): Flux<CollectorConfiguration>
+
+ companion object {
+ fun from(function: () -> Flux<CollectorConfiguration>): ConfigurationProvider =
+ object : ConfigurationProvider {
+ override fun invoke(): Flux<CollectorConfiguration> = function()
+ }
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
new file mode 100644
index 00000000..809ba32f
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+interface Collector {
+ fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void>
+}
+
+typealias CollectorProvider = () -> Collector
+
+interface Server {
+ fun start(): Mono<Void>
+}
+
+interface ServerFactory {
+ fun createServer(serverConfig: ServerConfiguration, collector: CollectorProvider): Server
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt
new file mode 100644
index 00000000..f5e525e7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/CollectorConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
new file mode 100644
index 00000000..2cf2b082
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/RoutedMessage.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
new file mode 100644
index 00000000..ea78706e
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/ServerConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class ServerConfiguration(val port: Int, val configurationUrl: String)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt
new file mode 100644
index 00000000..79b48804
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/VesMessage.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteBuf)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
new file mode 100644
index 00000000..306b7762
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrame.kt
@@ -0,0 +1,93 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+
+/**
+ * Wire frame structure is presented bellow. All fields are in network byte order (big-endian).
+ *
+ * ```
+ * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┐
+ * │octet│ 0 │ 1 │ 2 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * │ bit │ 0│ │ │ │ │ │ │ │ 8│ │ │ │ │ │ │ │16│ │ │ │ │ │ │ │ ...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┼──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ 0xFF │ major version │ minor version │
+ * └─────┴───────────────────────┴───────────────────────┴───────────────────────┘
+ * ┌─────┬───────────────────────┬───────────────────────┬───────────────────────┬───────────────────────┐
+ * │octet│ 3 │ 4 │ 5 │ 6 │
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┼──┬──┬──┬──┬──┬──┬──┬──┤
+ * ... │ bit │24│ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ │ ...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──┤
+ * │field│ payload size │
+ * └─────┴───────────────────────────────────────────────────────────────────────────────────────────────┘
+ * ┌─────┬───────────────────────
+ * │octet│ 7 ...
+ * ├─────┼──┬──┬──┬──┬──┬──┬──┬──
+ * ... │ bit │56│ │ │ │ │ │ │...
+ * ├─────┼──┴──┴──┴──┴──┴──┴──┴──
+ * │field│ protobuf payload
+ * └─────┴───────────────────────
+ * ```
+ *
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class WireFrame(val payload: ByteBuf,
+ val mark: Short,
+ val majorVersion: Short,
+ val minorVersion: Short,
+ val payloadSize: Int) {
+ fun isValid(): Boolean {
+ return mark == FF_BYTE
+ && majorVersion == SUPPORTED_MAJOR_VERSION
+ && payload.readableBytes() == payloadSize
+ }
+
+ fun encode(allocator: ByteBufAllocator): ByteBuf {
+ val bb = allocator.buffer(HEADER_SIZE + payload.readableBytes())
+
+ bb.writeByte(mark.toInt())
+ bb.writeByte(majorVersion.toInt())
+ bb.writeByte(minorVersion.toInt())
+ bb.writeInt(payloadSize)
+ bb.writeBytes(payload)
+
+ return bb
+ }
+
+ companion object {
+ fun decode(byteBuf: ByteBuf): WireFrame {
+ val mark = byteBuf.readUnsignedByte()
+ val majorVersion = byteBuf.readUnsignedByte()
+ val minorVersion = byteBuf.readUnsignedByte()
+ val payloadSize = byteBuf.readInt()
+ val payload = byteBuf.slice()
+
+ return WireFrame(payload, mark, majorVersion, minorVersion, payloadSize)
+ }
+
+ private const val HEADER_SIZE = 3 + java.lang.Integer.BYTES
+ private const val FF_BYTE: Short = 0xFF
+ private const val SUPPORTED_MAJOR_VERSION: Short = 1
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt
new file mode 100644
index 00000000..3f826f7c
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/routing.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.domain
+
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+data class Routing(val routes: List<Route>) {
+
+ fun routeFor(commonHeader: CommonEventHeader): Route? = routes.find { it.applies(commonHeader) }
+}
+
+data class Route(val domain: Domain, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+
+ fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
+
+ operator fun invoke(message: VesMessage): RoutedMessage =
+ RoutedMessage(targetTopic, partitioning(message.header), message)
+}
+
+
+/*
+Configuration DSL
+ */
+
+fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
+ val conf = RoutingBuilder()
+ conf.init()
+ return conf
+}
+
+class RoutingBuilder {
+ private val routes: MutableList<RouteBuilder> = mutableListOf()
+
+ fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
+ val rule = RouteBuilder()
+ rule.init()
+ routes.add(rule)
+ return rule
+ }
+
+ fun build() = Routing(routes.map { it.build() }.toList())
+}
+
+class RouteBuilder {
+
+ private lateinit var domain: Domain
+ private lateinit var targetTopic: String
+ private lateinit var partitioning: (CommonEventHeader) -> Int
+
+ fun fromDomain(domain: Domain) {
+ this.domain = domain
+ }
+
+ fun toTopic(targetTopic: String) {
+ this.targetTopic = targetTopic
+ }
+
+ fun withFixedPartitioning(num: Int = 1) {
+ partitioning = { _ -> num }
+ }
+
+ fun build() = Route(domain, targetTopic, partitioning)
+
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
new file mode 100644
index 00000000..cb018cca
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -0,0 +1,59 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.impl.MessageValidator
+import org.onap.dcae.collectors.veshv.impl.Router
+import org.onap.dcae.collectors.veshv.impl.VesDecoder
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.WireDecoder
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CollectorFactory(val configuration: ConfigurationProvider, val sinkProvider: SinkProvider) {
+ fun createVesHvCollectorProvider(): CollectorProvider {
+ val collector: AtomicReference<Collector> = AtomicReference()
+ createVesHvCollector().subscribe(collector::set)
+ return collector::get
+ }
+
+ private fun createVesHvCollector(): Flux<Collector> =
+ configuration().map(this::createVesHvCollector)
+
+ private fun createVesHvCollector(config: CollectorConfiguration): Collector {
+ return VesHvCollector(
+ WireDecoder(),
+ VesDecoder(),
+ MessageValidator(),
+ Router(config.routing),
+ sinkProvider(config))
+ }
+
+}
+
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
new file mode 100644
index 00000000..5e60fa56
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import org.onap.dcae.collectors.veshv.impl.NettyTcpServer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ServerFactory {
+ val createNettyTcpServer: (ServerConfiguration, CollectorProvider) -> Server = ::NettyTcpServer
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
new file mode 100644
index 00000000..55e65cf7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -0,0 +1,48 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+
+internal class MessageValidator {
+
+ val requiredFieldDescriptors = listOf(
+ "version",
+ "eventName",
+ "domain",
+ "eventId",
+ "sourceName",
+ "reportingEntityName",
+ "priority",
+ "startEpochMicrosec",
+ "lastEpochMicrosec",
+ "sequence")
+ .map { fieldName -> CommonEventHeader.getDescriptor().findFieldByName(fieldName)}
+
+ fun isValid(message: VesMessage): Boolean {
+ val header = message.header
+ return allMandatoryFieldsArePresent(header) && header.domain == CommonEventHeader.Domain.HVRANMEAS
+ }
+
+ private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
+ requiredFieldDescriptors
+ .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
new file mode 100644
index 00000000..ca77df2a
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt
@@ -0,0 +1,66 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.domain.ServerConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.NettyInbound
+import reactor.ipc.netty.NettyOutbound
+import reactor.ipc.netty.tcp.TcpServer
+import java.util.function.BiFunction
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class NettyTcpServer(val serverConfig: ServerConfiguration,
+ val collectorProvider: CollectorProvider) : Server {
+
+ override fun start(): Mono<Void> {
+ logger.info { "Listening on port ${serverConfig.port}" }
+ return Mono.defer {
+ val nettyContext = TcpServer.create(serverConfig.port)
+ .start(BiFunction<NettyInbound, NettyOutbound, Publisher<Void>> { t, u ->
+ handleConnection(t, u)
+ })
+ Mono.never<Void>().doFinally { _ -> nettyContext.shutdown() }
+ }
+ }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> {
+ logger.debug("Got connection")
+ val pipe = collectorProvider().handleConnection(nettyInbound.receive())
+
+ val hello = nettyOutbound
+ .options { it.flushOnEach() }
+ .sendString(Mono.just("ONAP_VES_HV/0.1\n"))
+ .then()
+
+ return hello.then(pipe)
+ }
+
+ companion object {
+ private val logger = Logger(NettyTcpServer::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
new file mode 100644
index 00000000..8c005d42
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -0,0 +1,9 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.Routing
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+
+class Router(private val routing: Routing) {
+ fun findDestination(message: VesMessage): RoutedMessage? = routing.routeFor(message.header)?.invoke(message)
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
new file mode 100644
index 00000000..6ace06e4
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.InvalidProtocolBufferException
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesDecoder {
+
+ fun decode(bb: ByteBuf): VesMessage? =
+ try {
+ val decodedHeader = VesEvent.parseFrom(bb.nioBuffer()).commonEventHeader
+ VesMessage(decodedHeader, bb)
+ } catch (ex: InvalidProtocolBufferException) {
+ logger.warn { "Dropping incoming message. Invalid protocol buffer: ${ex.message}" }
+ logger.debug("Cause", ex)
+ null
+ }
+
+
+ companion object {
+ private val logger = Logger(VesDecoder::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
new file mode 100644
index 00000000..af9d0b0a
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesHvCollector(
+ val wireDecoder: WireDecoder,
+ val protobufDecoder: VesDecoder,
+ val validator: MessageValidator,
+ val router: Router,
+ val sink: Sink) : Collector {
+ override fun handleConnection(dataStream: Flux<ByteBuf>): Mono<Void> =
+ dataStream
+ .flatMap(this::decodeWire)
+ .flatMap(this::decodeProtobuf)
+ .filter(this::validate)
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext(this::releaseMemory)
+ .then()
+
+ private fun decodeWire(wire: ByteBuf) = releaseWhenNull(wire, wireDecoder::decode)
+
+ private fun decodeProtobuf(protobuf: ByteBuf) = releaseWhenNull(protobuf, protobufDecoder::decode)
+
+ private fun validate(msg: VesMessage): Boolean {
+ val valid = validator.isValid(msg)
+ if (!valid) {
+ msg.rawMessage.release()
+ }
+ return valid
+ }
+
+ private fun findRoute(msg: VesMessage): Mono<RoutedMessage> {
+ val routedMessage = router.findDestination(msg)
+ return if (routedMessage == null)
+ Mono.empty()
+ else
+ Mono.just(routedMessage)
+ }
+
+ private fun releaseMemory(msg: VesMessage) {
+ msg.rawMessage.release()
+ }
+
+ private fun <T>releaseWhenNull(input: ByteBuf, mapper: (ByteBuf) -> T?): Mono<T> {
+ val result = mapper(input)
+ return if (result == null) {
+ input.release()
+ Mono.empty()
+ } else {
+ Mono.just(result)
+ }
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
new file mode 100644
index 00000000..66ced2d7
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoder.kt
@@ -0,0 +1,44 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.ByteBuf
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireDecoder {
+ fun decode(byteBuf: ByteBuf): ByteBuf? =
+ try {
+ WireFrame.decode(byteBuf)
+ .takeIf { it.isValid() }
+ .let { it?.payload }
+ } catch (ex: IndexOutOfBoundsException) {
+ logger.debug { "Wire protocol frame could not be decoded - input is too small" }
+ null
+ }
+
+ companion object {
+ private val logger = Logger(WireDecoder::class)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
new file mode 100644
index 00000000..cdfcfd30
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -0,0 +1,65 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG
+import org.apache.kafka.common.serialization.ByteBufferSerializer
+import org.apache.kafka.common.serialization.StringSerializer
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+
+ fun staticConfigurationProvider(config: CollectorConfiguration) =
+ object : ConfigurationProvider {
+ override fun invoke() = Flux.just(config)
+ }
+
+ private class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ val sender = KafkaSender.create(
+ SenderOptions.create<CommonEventHeader, ByteBuffer>()
+ .producerProperty(BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
+ .producerProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteBufferSerializer::class.java))
+ return KafkaSink(sender)
+ }
+ }
+
+ fun consulConfigurationProvider(url: String): ConfigurationProvider =
+ ConsulConfigurationProvider(url, httpAdapter())
+ fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+}
+
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
new file mode 100644
index 00000000..ef6c2f76
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -0,0 +1,63 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.domain.CollectorConfiguration
+import org.onap.ves.VesEventV5
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Flux
+import java.io.StringReader
+import java.time.Duration
+import java.util.*
+import java.util.concurrent.atomic.AtomicReference
+import javax.json.Json
+import javax.json.JsonObject
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConsulConfigurationProvider(private val url: String, private val http: HttpAdapter)
+ : ConfigurationProvider {
+
+
+ private val logger = LoggerFactory.getLogger(ConsulConfigurationProvider::class.java)
+ private var lastConfigurationHash: AtomicReference<Int> = AtomicReference()
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ Flux.interval(Duration.ZERO, REFRESH_INTERVAL)
+ .flatMap { http.getResponse(url) }
+ .filter { body -> body.hashCode() != lastConfigurationHash.get() }
+ .doOnNext { body -> lastConfigurationHash.set(body.hashCode()) }
+ .map { str -> getConfigurationJson(str) }
+ .map { json -> createCollectorConfiguration(json) }
+
+ private fun getConfigurationJson(str: String): JsonObject {
+ val response = Json.createReader(StringReader(str)).readArray().getJsonObject(0)
+ val decodedValue = String(
+ Base64.getDecoder().decode(response.getString("Value")))
+ logger.info("Obtained new configuration from consul:\n$decodedValue")
+ return Json.createReader(StringReader(decodedValue)).readObject()
+ }
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+
+ val routing = configuration.getJsonObject("routing")
+
+ return CollectorConfiguration(
+ kafkaBootstrapServers = configuration.getString("kafkaBootstrapServers"),
+ routing = org.onap.dcae.collectors.veshv.domain.routing {
+ defineRoute {
+ fromDomain(VesEventV5.VesEvent.CommonEventHeader.Domain.forNumber(routing.getInt("fromDomain")))
+ toTopic(routing.getString("toTopic"))
+ withFixedPartitioning()
+ }
+ }.build()
+ )
+ }
+
+ companion object {
+ private const val REFRESH_INTERVAL_MINUTES: Long = 5
+ private val REFRESH_INTERVAL = Duration.ofMinutes(REFRESH_INTERVAL_MINUTES)
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
new file mode 100644
index 00000000..c09af015
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -0,0 +1,29 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Mono
+import reactor.core.publisher.toMono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.ipc.netty.http.client.HttpClientResponse
+import java.nio.charset.Charset
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+open class HttpAdapter(private val httpClient: HttpClient) {
+
+ private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
+
+ open fun getResponse(url: String): Mono<String> =
+ httpClient.get(url)
+ .onErrorResume { e -> unableToGetResource(e, url) }
+ .flatMap { res -> res.receiveContent().toMono() }
+ .map { content -> content.content().toString(Charset.defaultCharset()) }
+
+
+ private fun unableToGetResource(e: Throwable, url: String): Mono<HttpClientResponse> {
+ logger.info("Failed to get resource on path: $url\n${e.localizedMessage}")
+ return Mono.empty()
+ }
+}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt
new file mode 100644
index 00000000..8b558a85
--- /dev/null
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/KafkaSink.kt
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderRecord
+import reactor.kafka.sender.SenderResult
+import java.nio.ByteBuffer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, ByteBuffer>) : Sink {
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<VesMessage> {
+ val records = messages.map(this::vesToKafkaRecord)
+ return sender.send(records)
+ .doOnNext(::logException)
+ .filter(::isSuccessful)
+ .map { it.correlationMetadata() }
+ }
+
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, ByteBuffer, VesMessage> {
+ return SenderRecord.create(
+ msg.topic,
+ msg.partition,
+ System.currentTimeMillis(),
+ msg.message.header,
+ msg.message.rawMessage.nioBuffer(),
+ msg.message)
+ }
+
+ private fun logException(senderResult: SenderResult<out Any>) {
+ if (senderResult.exception() != null) {
+ logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+
+ companion object {
+ val logger = Logger(KafkaSink::class)
+ }
+}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
new file mode 100644
index 00000000..8f6b76fb
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -0,0 +1,108 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.Unpooled.wrappedBuffer
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent
+import org.assertj.core.api.Assertions.assertThat
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.*
+
+internal object MessageValidatorTest : Spek({
+
+ fun vesMessageBytes(commonHeader: CommonEventHeader): ByteBuf {
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data"))
+ .build()
+ return wrappedBuffer(msg.toByteArray())
+ }
+
+ given("Message validator") {
+ val cut = MessageValidator()
+
+ on("ves hv message including header with fully initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setDomain(Domain.HVRANMEAS)
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .setReportingEntityName(ByteString.copyFromUtf8("Sample byte String"))
+ .setPriority(Priority.MEDIUM)
+ .setStartEpochMicrosec(120034455)
+ .setLastEpochMicrosec(120034459)
+ .setSequence(2)
+ .build()
+
+ it("should accept message with fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, vesMessageBytes(commonHeader))
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+ }
+
+ it("should reject message with domain other than HVRANMEAS") {
+ Domain.values()
+ .filter { it != Domain.HVRANMEAS && it != Domain.UNRECOGNIZED }
+ .forEach { domain ->
+ val header = newBuilder(commonHeader).setDomain(domain).build()
+ val vesMessage = VesMessage(header, vesMessageBytes(header))
+ assertThat(cut.isValid(vesMessage))
+ .describedAs("message with $domain domain")
+ .isFalse()
+ }
+ }
+ }
+
+ on("ves hv message bytes") {
+ val vesMessage = VesMessage(getDefaultInstance(), Unpooled.EMPTY_BUFFER)
+ it("should not accept message with default header") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+
+ on("ves hv message including header with not initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .build()
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("high volume data !!!"))
+ .build()
+ val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+
+ it("should not accept not fully initialized message header ") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
new file mode 100644
index 00000000..ac91aaeb
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -0,0 +1,92 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.Unpooled
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.domain.routing
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object RouterTest : Spek({
+ given("sample configuration") {
+ val config = routing {
+
+ defineRoute {
+ fromDomain(Domain.HVRANMEAS)
+ toTopic("ves_rtpm")
+ withFixedPartitioning(2)
+ }
+
+ defineRoute {
+ fromDomain(Domain.SYSLOG)
+ toTopic("ves_trace")
+ withFixedPartitioning()
+ }
+ }.build()
+ val cut = Router(config)
+
+ on("message with existing route (rtpm)") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HVRANMEAS), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result?.partition).isEqualTo(2)
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result?.topic).isEqualTo("ves_rtpm")
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result?.message).isSameAs(message)
+ }
+ }
+
+ on("message with existing route (trace)") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.SYSLOG), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result?.partition).isEqualTo(1)
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result?.topic).isEqualTo("ves_trace")
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result?.message).isSameAs(message)
+ }
+ }
+
+ on("message with unknown route") {
+ val message = VesMessage(vesCommonHeaderWithDomain(Domain.HEARTBEAT), Unpooled.EMPTY_BUFFER)
+ val result = cut.findDestination(message)
+
+ it("should not have route available") {
+ assertThat(result).isNull()
+ }
+ }
+ }
+})
+
+private fun vesCommonHeaderWithDomain(domain: Domain) =
+ CommonEventHeader.getDefaultInstance().toBuilder()
+ .setDomain(domain)
+ .build() \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
new file mode 100644
index 00000000..4d9c9239
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -0,0 +1,69 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA Intellectual Property. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import com.google.protobuf.ByteString
+import io.netty.buffer.Unpooled.wrappedBuffer
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.ves.VesEventV5.VesEvent
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader
+import reactor.test.StepVerifier
+import java.nio.charset.Charset
+
+
+internal object VesDecoderTest : Spek({
+
+ given("ves message decoder") {
+ val cut = VesDecoder()
+
+ on("ves hv message bytes") {
+ val commonHeader = CommonEventHeader.getDefaultInstance()
+ val msg = VesEvent.newBuilder()
+ .setCommonEventHeader(commonHeader)
+ .setHvRanMeasFields(ByteString.copyFromUtf8("highvolume measurements"))
+ .build()
+ val rawMessageBytes = wrappedBuffer(msg.toByteArray())
+
+
+ it("should decode only header and pass it on along with raw message") {
+ val expectedMessage = VesMessage(
+ commonHeader,
+ rawMessageBytes
+ )
+
+ assertThat(cut.decode(rawMessageBytes)).isEqualTo(expectedMessage)
+
+ }
+ }
+
+ on("invalid ves hv message bytes") {
+ val rawMessageBytes = wrappedBuffer("ala ma kota".toByteArray(Charset.defaultCharset()))
+
+ it("should return empty result") {
+ assertThat(cut.decode(rawMessageBytes)).isNull()
+ }
+ }
+ }
+})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
new file mode 100644
index 00000000..3563bf6d
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/WireDecoderTest.kt
@@ -0,0 +1,104 @@
+package org.onap.dcae.collectors.veshv.impl
+
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrame
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireDecoderTest : Spek({
+ describe("decoding wire protocol") {
+ val cut = WireDecoder()
+
+ fun decode(frame: WireFrame) =
+ cut.decode(
+ frame.encode(UnpooledByteBufAllocator.DEFAULT))
+
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ assertThat(cut.decode(input)).isNull()
+ }
+ }
+
+ given("input without 0xFF first byte") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ mark = 0x10,
+ majorVersion = 1,
+ minorVersion = 2,
+ payloadSize = 0)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with unsupported major version") {
+ val input = WireFrame(
+ payload = Unpooled.EMPTY_BUFFER,
+ mark = 0xFF,
+ majorVersion = 100,
+ minorVersion = 2,
+ payloadSize = 0)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with too small payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 1)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("input with too big payload size") {
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(byteArrayOf(1, 2 ,3)),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = 8)
+
+ it("should yield empty result") {
+ assertThat(decode(input)).isNull()
+ }
+ }
+
+ given("valid input") {
+ val payload = byteArrayOf(6, 9, 8, 6)
+ val input = WireFrame(
+ payload = Unpooled.wrappedBuffer(payload),
+ mark = 0xFF,
+ majorVersion = 1,
+ minorVersion = 0,
+ payloadSize = payload.size)
+
+
+ it("should yield Google Protocol Buffers payload") {
+ val result = decode(input)!!
+
+ val actualPayload = ByteArray(result.readableBytes())
+ result.readBytes(actualPayload)
+
+ assertThat(actualPayload).containsExactly(*payload)
+ }
+ }
+ }
+})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
new file mode 100644
index 00000000..bf65c859
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -0,0 +1,60 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.ves.VesEventV5.VesEvent.CommonEventHeader.Domain
+import reactor.core.publisher.Mono
+import java.util.*
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object ConsulConfigurationProviderTest : Spek({
+
+ given("valid resource url") {
+ val testUrl = "http://valid-url/"
+ val httpAdapterMock: HttpAdapter = mock()
+ val consulConfigProvider = ConsulConfigurationProvider(testUrl, httpAdapterMock)
+
+ whenever(httpAdapterMock.getResponse(testUrl)).thenReturn(Mono.just(constructConsulResponse()))
+
+
+ it("should create valid collector configuration") {
+ val response = consulConfigProvider().blockFirst()
+ assertEquals("val1", response.kafkaBootstrapServers)
+ val route = response.routing.routes[0]
+ assertEquals(Domain.MEASUREMENTS_FOR_VF_SCALING, route.domain)
+ assertEquals("val3", route.targetTopic)
+ }
+ }
+})
+
+fun constructConsulResponse(): String {
+
+ val config = """{
+ "kafkaBootstrapServers": "val1",
+ "routing": {
+ "fromDomain": 2,
+ "toTopic": "val3"
+ }
+ }"""
+
+ val encodedValue = String(Base64.getEncoder().encode(config.toByteArray()))
+
+ return """[
+ {
+ "CreateIndex": 100,
+ "ModifyIndex": 200,
+ "LockIndex": 200,
+ "Key": "zip",
+ "Flags": 0,
+ "Value": "$encodedValue",
+ "Session": "adf4238a-882b-9ddc-4a9d-5b6758e4159e"
+ }
+ ]"""
+}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
new file mode 100644
index 00000000..10e8b6bf
--- /dev/null
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -0,0 +1,60 @@
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockito_kotlin.mock
+import com.nhaarman.mockito_kotlin.whenever
+import io.netty.buffer.Unpooled
+import io.netty.handler.codec.http.HttpContent
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.ipc.netty.http.client.HttpClient
+import reactor.ipc.netty.http.client.HttpClientResponse
+import java.nio.charset.Charset
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object HttpAdapterTest : Spek({
+
+ given("valid resource url") {
+
+ val httpClientMock: HttpClient = mock()
+ val httpAdapter = HttpAdapter(httpClientMock)
+ val testUrl = "http://valid-url/"
+ val responseContent = """{"key1": "value1", "key2": "value2"}"""
+ val httpResponse = createHttpResponseMock(responseContent)
+ whenever(httpClientMock.get(testUrl)).thenReturn(Mono.just(httpResponse))
+
+ it("should return response string") {
+ assertEquals(responseContent, httpAdapter.getResponse(testUrl).block())
+ }
+ }
+
+ given("invalid resource url") {
+
+ val httpClientMock: HttpClient = mock()
+ val httpAdapter = HttpAdapter(httpClientMock)
+ val testUrl = "http://invalid-url/"
+ whenever(httpClientMock.get(testUrl)).thenReturn(Mono.error(Exception("Test exception")))
+
+
+ it("should return null response") {
+ assertEquals(null, httpAdapter.getResponse(testUrl).block())
+ }
+ }
+})
+
+fun createHttpResponseMock(content: String): HttpClientResponse {
+ val responseMock: HttpClientResponse = mock()
+ val contentMock: HttpContent = mock()
+ val contentByteBuff = Unpooled.copiedBuffer(content, Charset.defaultCharset())
+
+ whenever(responseMock.receiveContent()).thenReturn(Flux.just(contentMock))
+ whenever(contentMock.content()).thenReturn(contentByteBuff)
+
+ return responseMock
+}
diff --git a/hv-collector-core/src/test/resources/logback.xml b/hv-collector-core/src/test/resources/logback.xml
new file mode 100644
index 00000000..809f62d4
--- /dev/null
+++ b/hv-collector-core/src/test/resources/logback.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="DEBUG"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration> \ No newline at end of file