aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-28 15:46:50 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-29 14:41:42 +0100
commitdde383a2aa75f94c26d7949665b79cc95486a223 (patch)
tree75f3e8f564067afd0e67dbe6254183e45ca26944 /sources/hv-collector-core
parent77f896523f2065b1da1be21545155a29edea5122 (diff)
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/pom.xml137
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt50
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt80
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt35
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt38
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt30
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt39
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt90
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt122
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt68
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt62
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt82
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt45
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt114
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt101
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt29
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt26
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt22
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt37
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt32
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt83
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt128
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt112
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt74
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt157
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt86
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt234
-rw-r--r--sources/hv-collector-core/src/test/resources/logback-test.xml35
33 files changed, 2333 insertions, 0 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
new file mode 100644
index 00000000..7f7922e1
--- /dev/null
+++ b/sources/hv-collector-core/pom.xml
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ ============LICENSE_START=======================================================
+ ~ dcaegen2-collectors-veshv
+ ~ ================================================================================
+ ~ Copyright (C) 2018 NOKIA
+ ~ ================================================================================
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ ~ ============LICENSE_END=========================================================
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <licenses>
+ <license>
+ <name>The Apache Software License, Version 2.0</name>
+ <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+ </license>
+ </licenses>
+
+ <parent>
+ <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
+ <artifactId>hv-collector-sources</artifactId>
+ <version>1.1.0-SNAPSHOT</version>
+ <relativePath>..</relativePath>
+ </parent>
+
+ <artifactId>hv-collector-core</artifactId>
+ <description>VES HighVolume Collector :: Core</description>
+
+ <properties>
+ <skipAnalysis>false</skipAnalysis>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>kotlin-maven-plugin</artifactId>
+ <groupId>org.jetbrains.kotlin</groupId>
+ </plugin>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <groupId>org.apache.maven.plugins</groupId>
+ </plugin>
+ <plugin>
+ <groupId>org.jacoco</groupId>
+ <artifactId>jacoco-maven-plugin</artifactId>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-ssl</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-utils</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-health-check</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-domain</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>${project.parent.groupId}</groupId>
+ <artifactId>hv-collector-test-utils</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.jetbrains.kotlin</groupId>
+ <artifactId>kotlin-reflect</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-effects</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.addons</groupId>
+ <artifactId>reactor-extra</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.netty</groupId>
+ <artifactId>reactor-netty</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor.kafka</groupId>
+ <artifactId>reactor-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>javax.json</groupId>
+ <artifactId>javax.json-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish</groupId>
+ <artifactId>javax.json</artifactId>
+ <scope>runtime</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
new file mode 100644
index 00000000..dd0111bc
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -0,0 +1,50 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import reactor.core.publisher.Flux
+
+interface Sink {
+ fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
+}
+
+interface Metrics {
+ fun notifyBytesReceived(size: Int)
+ fun notifyMessageReceived(size: Int)
+ fun notifyMessageSent(topic: String)
+}
+
+@FunctionalInterface
+interface SinkProvider {
+ operator fun invoke(config: CollectorConfiguration): Sink
+
+ companion object {
+ fun just(sink: Sink): SinkProvider =
+ object : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink = sink
+ }
+ }
+}
+
+interface ConfigurationProvider {
+ operator fun invoke(): Flux<CollectorConfiguration>
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
new file mode 100644
index 00000000..3c85a9b1
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.boundary
+
+import arrow.core.Option
+import arrow.effects.IO
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+interface Collector {
+ fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
+}
+
+typealias CollectorProvider = () -> Option<Collector>
+
+interface Server {
+ fun start(): IO<ServerHandle>
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
new file mode 100644
index 00000000..5c96e1c5
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -0,0 +1,80 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.impl.Router
+import org.onap.dcae.collectors.veshv.impl.VesDecoder
+import org.onap.dcae.collectors.veshv.impl.VesHvCollector
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.utils.arrow.getOption
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import java.util.concurrent.atomic.AtomicReference
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+class CollectorFactory(val configuration: ConfigurationProvider,
+ private val sinkProvider: SinkProvider,
+ private val metrics: Metrics,
+ private val maximumPayloadSizeBytes: Int,
+ private val healthState: HealthState = HealthState.INSTANCE) {
+
+ fun createVesHvCollectorProvider(): CollectorProvider {
+ val collector: AtomicReference<Collector> = AtomicReference()
+ configuration()
+ .map(this::createVesHvCollector)
+ .doOnNext {
+ logger.info("Using updated configuration for new connections")
+ healthState.changeState(HealthDescription.HEALTHY)
+ }
+ .doOnError {
+ logger.error("Failed to acquire configuration from consul")
+ healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
+ }
+ .subscribe(collector::set)
+ return collector::getOption
+ }
+
+ private fun createVesHvCollector(config: CollectorConfiguration): Collector {
+ return VesHvCollector(
+ wireChunkDecoderSupplier = { alloc ->
+ WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
+ },
+ protobufDecoder = VesDecoder(),
+ router = Router(config.routing),
+ sink = sinkProvider(config),
+ metrics = metrics)
+ }
+
+ companion object {
+ private val logger = Logger(CollectorFactory::class)
+ }
+}
+
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
new file mode 100644
index 00000000..dce933ab
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
@@ -0,0 +1,35 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.factory
+
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object ServerFactory {
+ fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
+ NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
new file mode 100644
index 00000000..fb949079
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
@@ -0,0 +1,38 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
+import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
+internal object MessageValidator {
+
+ fun isValid(message: VesMessage): Boolean {
+ return allMandatoryFieldsArePresent(message.header)
+ }
+
+ private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
+ headerRequiredFieldDescriptors
+ .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
+ .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
+
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
new file mode 100644
index 00000000..cee658b6
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.Routing
+import org.onap.dcae.collectors.veshv.model.VesMessage
+
+class Router(private val routing: Routing) {
+ fun findDestination(message: VesMessage): Option<RoutedMessage> =
+ routing.routeFor(message.header).map { it(message) }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
new file mode 100644
index 00000000..1d43588f
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
@@ -0,0 +1,39 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Try
+import arrow.core.Option
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.VesEvent
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesDecoder {
+
+ fun decode(bytes: ByteData): Option<VesMessage> =
+ Try {
+ val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
+ VesMessage(decodedHeader, bytes)
+ }.toOption()
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
new file mode 100644
index 00000000..2f12e0cd
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -0,0 +1,90 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.boundary.Collector
+import org.onap.dcae.collectors.veshv.boundary.Metrics
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class VesHvCollector(
+ private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
+ private val protobufDecoder: VesDecoder,
+ private val router: Router,
+ private val sink: Sink,
+ private val metrics: Metrics) : Collector {
+
+ override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
+ wireChunkDecoderSupplier(alloc).let { wireDecoder ->
+ dataStream
+ .transform { decodeWireFrame(it, wireDecoder) }
+ .filter(WireFrameMessage::isValid)
+ .transform(::decodePayload)
+ .filter(VesMessage::isValid)
+ .transform(::routeMessage)
+ .onErrorResume { logger.handleReactiveStreamError(it) }
+ .doFinally { releaseBuffersMemory(wireDecoder) }
+ .then()
+ }
+
+ private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
+ .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
+ .concatMap(decoder::decode)
+ .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
+
+ private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
+ .map(WireFrameMessage::payload)
+ .map(protobufDecoder::decode)
+ .flatMap { omitWhenNone(it) }
+
+ private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
+ .flatMap(this::findRoute)
+ .compose(sink::send)
+ .doOnNext { metrics.notifyMessageSent(it.topic) }
+
+
+ private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
+
+ private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
+ {
+ logger.info("ommiting the message" + 5)
+ Mono.empty() },
+ { Mono.just(it) })
+
+ private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
+
+ companion object {
+ private val logger = Logger(VesHvCollector::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
new file mode 100644
index 00000000..8c16736d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import reactor.netty.http.client.HttpClient
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun kafkaSink(): SinkProvider = KafkaSinkProvider()
+ fun loggingSink(): SinkProvider = LoggingSinkProvider()
+
+ fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
+ ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
+
+ private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
new file mode 100644
index 00000000..ec7c60c0
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
@@ -0,0 +1,122 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import reactor.retry.Jitter
+import reactor.retry.Retry
+import java.io.StringReader
+import java.time.Duration
+import java.util.concurrent.atomic.AtomicReference
+import javax.json.Json
+import javax.json.JsonObject
+
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal class ConsulConfigurationProvider(private val http: HttpAdapter,
+ private val url: String,
+ private val firstRequestDelay: Duration,
+ private val requestInterval: Duration,
+ private val healthState: HealthState,
+ retrySpec: Retry<Any>
+
+) : ConfigurationProvider {
+
+ private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
+ private val retry = retrySpec
+ .doOnRetry {
+ logger.warn("Could not get fresh configuration", it.exception())
+ healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ }
+
+ constructor(http: HttpAdapter,
+ params: ConfigurationProviderParams) : this(
+ http,
+ params.configurationUrl,
+ params.firstRequestDelay,
+ params.requestInterval,
+ HealthState.INSTANCE,
+ Retry.any<Any>()
+ .retryMax(MAX_RETRIES)
+ .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
+ .jitter(Jitter.random())
+ )
+
+ override fun invoke(): Flux<CollectorConfiguration> =
+ Flux.interval(firstRequestDelay, requestInterval)
+ .concatMap { askForConfig() }
+ .flatMap(::filterDifferentValues)
+ .map(::parseJsonResponse)
+ .map(::createCollectorConfiguration)
+ .retryWhen(retry)
+
+ private fun askForConfig(): Mono<String> = http.get(url)
+
+ private fun filterDifferentValues(configurationString: String) =
+ hashOf(configurationString).let {
+ if (it == lastConfigurationHash.get()) {
+ Mono.empty()
+ } else {
+ lastConfigurationHash.set(it)
+ Mono.just(configurationString)
+ }
+ }
+
+ private fun hashOf(str: String) = str.hashCode()
+
+ private fun parseJsonResponse(responseString: String): JsonObject =
+ Json.createReader(StringReader(responseString)).readObject()
+
+ private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
+ logger.info { "Obtained new configuration from consul:\n${configuration}" }
+ val routing = configuration.getJsonArray("collector.routing")
+
+ return CollectorConfiguration(
+ kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
+ routing = org.onap.dcae.collectors.veshv.model.routing {
+ for (route in routing) {
+ val routeObj = route.asJsonObject()
+ defineRoute {
+ fromDomain(routeObj.getString("fromDomain"))
+ toTopic(routeObj.getString("toTopic"))
+ withFixedPartitioning()
+ }
+ }
+ }.build()
+ )
+ }
+
+ companion object {
+ private const val MAX_RETRIES = 5L
+ private const val BACKOFF_INTERVAL_FACTOR = 30L
+ private val logger = Logger(ConsulConfigurationProvider::class)
+ }
+}
+
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
new file mode 100644
index 00000000..bdce6f73
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
@@ -0,0 +1,68 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import io.netty.handler.codec.http.HttpStatusClass
+import org.slf4j.LoggerFactory
+import reactor.core.publisher.Mono
+import reactor.netty.http.client.HttpClient
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+open class HttpAdapter(private val httpClient: HttpClient) {
+
+ private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
+
+ open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
+ .get()
+ .uri(url + createQueryString(queryParams))
+ .responseSingle { response, content ->
+ if (response.status().codeClass() == HttpStatusClass.SUCCESS)
+ content.asString()
+ else {
+ val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
+ Mono.error(IllegalStateException(errorMessage))
+ }
+ }
+ .doOnError {
+ logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
+ logger.debug("Nested exception:", it)
+ }
+
+ private fun createQueryString(params: Map<String, Any>): String {
+ if (params.isEmpty())
+ return ""
+
+ val builder = StringBuilder("?")
+ params.forEach { (key, value) ->
+ builder
+ .append(key)
+ .append("=")
+ .append(value)
+ .append("&")
+
+ }
+
+ return builder.removeSuffix("&").toString()
+ }
+
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
new file mode 100644
index 00000000..5f4bf354
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
@@ -0,0 +1,62 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Flux
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class LoggingSinkProvider : SinkProvider {
+
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return object : Sink {
+ private val totalMessages = AtomicLong()
+ private val totalBytes = AtomicLong()
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
+ messages
+ .doOnNext(this::logMessage)
+
+ private fun logMessage(msg: RoutedMessage) {
+ val msgs = totalMessages.addAndGet(1)
+ val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
+ val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
+ if (msgs % INFO_LOGGING_FREQ == 0L)
+ logger.info(logMessageSupplier)
+ else
+ logger.trace(logMessageSupplier)
+ }
+
+ }
+ }
+
+ companion object {
+ const val INFO_LOGGING_FREQ = 100_000
+ private val logger = Logger(LoggingSinkProvider::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
new file mode 100644
index 00000000..a0c22418
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
@@ -0,0 +1,82 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.core.publisher.Flux
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderRecord
+import reactor.kafka.sender.SenderResult
+import java.util.concurrent.atomic.AtomicLong
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
+ private val sentMessages = AtomicLong(0)
+
+ override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
+ val records = messages.map(this::vesToKafkaRecord)
+ val result = sender.send(records)
+ .doOnNext(::logException)
+ .filter(::isSuccessful)
+ .map { it.correlationMetadata() }
+
+ return if (logger.traceEnabled) {
+ result.doOnNext(::logSentMessage)
+ } else {
+ result
+ }
+ }
+
+ private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
+ return SenderRecord.create(
+ msg.topic,
+ msg.partition,
+ System.currentTimeMillis(),
+ msg.message.header,
+ msg.message,
+ msg)
+ }
+
+ private fun logException(senderResult: SenderResult<out Any>) {
+ if (senderResult.exception() != null) {
+ logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
+ }
+ }
+
+ private fun logSentMessage(sentMsg: RoutedMessage) {
+ logger.trace {
+ val msgNum = sentMessages.incrementAndGet()
+ "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
+ }
+ }
+
+ private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
+
+ companion object {
+ val logger = Logger(KafkaSink::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
new file mode 100644
index 00000000..18191952
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
@@ -0,0 +1,45 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.onap.dcae.collectors.veshv.boundary.Sink
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+import reactor.kafka.sender.KafkaSender
+import reactor.kafka.sender.SenderOptions
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+internal class KafkaSinkProvider : SinkProvider {
+ override fun invoke(config: CollectorConfiguration): Sink {
+ return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
+ }
+
+ private fun constructSenderOptions(config: CollectorConfiguration) =
+ SenderOptions.create<CommonEventHeader, VesMessage>()
+ .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
+ .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
+ .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
new file mode 100644
index 00000000..4e9932cc
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
@@ -0,0 +1,40 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import com.google.protobuf.MessageLite
+import org.apache.kafka.common.serialization.Serializer
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class ProtobufSerializer : Serializer<MessageLite> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ // no configuration
+ }
+
+ override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
+ data?.toByteArray()
+
+ override fun close() {
+ // cleanup not needed
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
new file mode 100644
index 00000000..7a6ac7c8
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters.kafka
+
+import org.apache.kafka.common.serialization.Serializer
+import org.onap.dcae.collectors.veshv.model.VesMessage
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class VesMessageSerializer : Serializer<VesMessage> {
+ override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
+ }
+
+ override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
+
+ override fun close() {
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
new file mode 100644
index 00000000..e535300a
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -0,0 +1,114 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.socket
+
+import arrow.core.getOrElse
+import arrow.effects.IO
+import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
+import org.onap.dcae.collectors.veshv.boundary.Server
+import org.onap.dcae.collectors.veshv.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
+import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
+import org.onap.dcae.collectors.veshv.utils.ServerHandle
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import reactor.core.publisher.Mono
+import reactor.netty.ByteBufFlux
+import reactor.netty.Connection
+import reactor.netty.NettyInbound
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpServer
+import java.time.Duration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
+ private val sslContextFactory: ServerSslContextFactory,
+ private val collectorProvider: CollectorProvider) : Server {
+
+ override fun start(): IO<ServerHandle> = IO {
+ val tcpServer = TcpServer.create()
+ .addressSupplier { serverConfig.serverListenAddress }
+ .configureSsl()
+ .handle(this::handleConnection)
+
+ NettyServerHandle(tcpServer.bindNow())
+ }
+
+ private fun TcpServer.configureSsl() =
+ sslContextFactory
+ .createSslContext(serverConfig.securityConfiguration)
+ .map { sslContext ->
+ this.secure { b -> b.sslContext(sslContext) }
+ }.getOrElse { this }
+
+ private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
+ collectorProvider().fold(
+ {
+ nettyInbound.withConnection { conn ->
+ logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
+ }
+ Mono.empty()
+ },
+ {
+ nettyInbound.withConnection { conn ->
+ logger.info { "Handling connection from ${conn.address()}" }
+ conn.configureIdleTimeout(serverConfig.idleTimeout)
+ .logConnectionClosed()
+ }
+ it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
+ }
+ )
+
+ private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
+ .receive()
+ .retain()
+
+ private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
+ onReadIdle(timeout.toMillis()) {
+ logger.info {
+ "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
+ }
+ disconnectClient()
+ }
+ return this
+ }
+
+ private fun Connection.disconnectClient() {
+ channel().close().addListener {
+ if (it.isSuccess)
+ logger.debug { "Channel (${address()}) closed successfully." }
+ else
+ logger.warn("Channel close failed", it.cause())
+ }
+ }
+
+ private fun Connection.logConnectionClosed(): Connection {
+ onTerminate().subscribe {
+ logger.info("Connection from ${address()} has been closed")
+ }
+ return this
+ }
+
+ companion object {
+ private val logger = Logger(NettyTcpServer::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
new file mode 100644
index 00000000..4a2ef6b2
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
@@ -0,0 +1,101 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import arrow.effects.IO
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.ByteBufAllocator
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
+import reactor.core.publisher.Flux
+import reactor.core.publisher.SynchronousSink
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+internal class WireChunkDecoder(
+ private val decoder: WireFrameDecoder,
+ alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
+ private val streamBuffer = alloc.compositeBuffer()
+
+ fun release() {
+ streamBuffer.release()
+ }
+
+ fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
+ logIncomingMessage(byteBuf)
+ if (byteBuf.readableBytes() == 0) {
+ byteBuf.release()
+ Flux.empty()
+ } else {
+ streamBuffer.addComponent(true, byteBuf)
+ generateFrames()
+ .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
+ .doFinally { streamBuffer.discardReadComponents() }
+ }
+ }
+
+ private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
+ decoder.decodeFirst(streamBuffer)
+ .fold(onError(next), onSuccess(next))
+ .unsafeRunSync()
+ }
+
+ private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
+ when (err) {
+ is InvalidWireFrame -> IO {
+ next.error(WireFrameException(err))
+ }
+ is MissingWireFrameBytes -> IO {
+ logEndOfData()
+ next.complete()
+ }
+ }
+ }
+
+ private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
+ IO {
+ logDecodedWireMessage(frame)
+ next.next(frame)
+ }
+ }
+
+ private fun logIncomingMessage(wire: ByteBuf) {
+ logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
+ }
+
+ private fun logDecodedWireMessage(wire: WireFrameMessage) {
+ logger.trace { "Wire payload size: ${wire.payloadSize} B" }
+ }
+
+ private fun logEndOfData() {
+ logger.trace { "End of data in current TCP buffer" }
+ }
+
+ companion object {
+ val logger = Logger(WireChunkDecoder::class)
+ }
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
new file mode 100644
index 00000000..83a7cd85
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
@@ -0,0 +1,29 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since June 2018
+ */
+class WireFrameException(error: WireFrameDecodingError)
+ : Exception("${error::class.simpleName}: ${error.message}")
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
new file mode 100644
index 00000000..ec546c7d
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
@@ -0,0 +1,26 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
new file mode 100644
index 00000000..9de34498
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
@@ -0,0 +1,30 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import java.time.Duration
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since July 2018
+ */
+data class ConfigurationProviderParams(val configurationUrl: String,
+ val firstRequestDelay: Duration,
+ val requestInterval: Duration)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
new file mode 100644
index 00000000..782877e3
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
@@ -0,0 +1,22 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
new file mode 100644
index 00000000..85117684
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -0,0 +1,37 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
+import java.time.Duration
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class ServerConfiguration(
+ val serverListenAddress: InetSocketAddress,
+ val configurationProviderParams: ConfigurationProviderParams,
+ val securityConfiguration: SecurityConfiguration,
+ val idleTimeout: Duration,
+ val healthCheckApiListenAddress: InetSocketAddress,
+ val maximumPayloadSizeBytes: Int,
+ val dummyMode: Boolean = false)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
new file mode 100644
index 00000000..f5bfcce1
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
@@ -0,0 +1,32 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.impl.MessageValidator
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
+ fun isValid(): Boolean = MessageValidator.isValid(this)
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
new file mode 100644
index 00000000..bab95c57
--- /dev/null
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.model
+
+import arrow.core.Option
+import org.onap.ves.VesEventOuterClass.CommonEventHeader
+
+data class Routing(val routes: List<Route>) {
+
+ fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
+ Option.fromNullable(routes.find { it.applies(commonHeader) })
+}
+
+data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
+
+ fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
+
+ operator fun invoke(message: VesMessage): RoutedMessage =
+ RoutedMessage(targetTopic, partitioning(message.header), message)
+}
+
+
+/*
+Configuration DSL
+ */
+
+fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
+ val conf = RoutingBuilder()
+ conf.init()
+ return conf
+}
+
+class RoutingBuilder {
+ private val routes: MutableList<RouteBuilder> = mutableListOf()
+
+ fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
+ val rule = RouteBuilder()
+ rule.init()
+ routes.add(rule)
+ return rule
+ }
+
+ fun build() = Routing(routes.map { it.build() }.toList())
+}
+
+class RouteBuilder {
+
+ private lateinit var domain: String
+ private lateinit var targetTopic: String
+ private lateinit var partitioning: (CommonEventHeader) -> Int
+
+ fun fromDomain(domain: String) {
+ this.domain = domain
+ }
+
+ fun toTopic(targetTopic: String) {
+ this.targetTopic = targetTopic
+ }
+
+ fun withFixedPartitioning(num: Int = 0) {
+ partitioning = { num }
+ }
+
+ fun build() = Route(domain, targetTopic, partitioning)
+
+}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
new file mode 100644
index 00000000..3090042d
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
@@ -0,0 +1,128 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
+
+internal object MessageValidatorTest : Spek({
+
+ given("Message validator") {
+ val cut = MessageValidator
+
+ on("ves hv message including header with fully initialized fields") {
+ val commonHeader = commonHeader()
+
+ it("should accept message with fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
+ }
+
+ VesEventDomain.values()
+ .forEach { domain ->
+ it("should accept message with $domain domain") {
+ val header = commonHeader(domain)
+ val vesMessage = VesMessage(header, vesEventBytes(header))
+ assertThat(cut.isValid(vesMessage))
+ .isTrue()
+ }
+ }
+ }
+
+ on("ves hv message bytes") {
+ val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
+ it("should not accept message with default header") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ val priorityTestCases = mapOf(
+ Priority.PRIORITY_NOT_PROVIDED to false,
+ Priority.HIGH to true
+ )
+
+ priorityTestCases.forEach { value, expectedResult ->
+ on("ves hv message including header with priority $value") {
+ val commonEventHeader = commonHeader(priority = value)
+ val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
+
+ it("should resolve validation result") {
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
+ .isEqualTo(expectedResult)
+ }
+ }
+ }
+
+ on("ves hv message including header with not initialized fields") {
+ val commonHeader = newBuilder()
+ .setVersion("1.9")
+ .setEventName("Sample event name")
+ .setEventId("Sample event Id")
+ .setSourceName("Sample Source")
+ .build()
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept not fully initialized message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with non-string major part") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with major part != 7") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+
+ on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
+ val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
+ val rawMessageBytes = vesEventBytes(commonHeader)
+
+ it("should not accept message header") {
+ val vesMessage = VesMessage(commonHeader, rawMessageBytes)
+ assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
+ }
+ }
+ }
+})
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
new file mode 100644
index 00000000..e8a31231
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -0,0 +1,112 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.None
+import arrow.core.Some
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.model.RoutedMessage
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.model.routing
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object RouterTest : Spek({
+ given("sample configuration") {
+ val config = routing {
+
+ defineRoute {
+ fromDomain(PERF3GPP.domainName)
+ toTopic("ves_rtpm")
+ withFixedPartitioning(2)
+ }
+
+ defineRoute {
+ fromDomain(SYSLOG.domainName)
+ toTopic("ves_trace")
+ withFixedPartitioning()
+ }
+ }.build()
+ val cut = Router(config)
+
+ on("message with existing route (rtpm)") {
+ val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ }
+ }
+
+ on("message with existing route (trace)") {
+ val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should have route available") {
+ assertThat(result).isNotNull()
+ }
+
+ it("should be routed to proper partition") {
+ assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
+ }
+
+ it("should be routed to proper topic") {
+ assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
+ }
+
+ it("should be routed with a given message") {
+ assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
+ }
+ }
+
+ on("message with unknown route") {
+ val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
+ val result = cut.findDestination(message)
+
+ it("should not have route available") {
+ assertThat(result).isEqualTo(None)
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
new file mode 100644
index 00000000..8950a557
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
@@ -0,0 +1,74 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl
+
+import arrow.core.Option
+import com.google.protobuf.ByteString
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.domain.ByteData
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.model.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
+import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
+import java.nio.charset.Charset
+import kotlin.test.assertTrue
+import kotlin.test.fail
+
+
+internal object VesDecoderTest : Spek({
+
+ given("ves message decoder") {
+ val cut = VesDecoder()
+
+ on("ves hv message bytes") {
+ val commonHeader = commonHeader(HEARTBEAT)
+ val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
+
+ it("should decode only header and pass it on along with raw message") {
+ val expectedMessage = VesMessage(
+ commonHeader,
+ rawMessageBytes
+ )
+
+ assertTrue {
+ cut.decode(rawMessageBytes).exists {
+ it == expectedMessage
+ }
+ }
+ }
+ }
+
+ on("invalid ves hv message bytes") {
+ val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
+
+ it("should throw error") {
+ assertFailedWithError(cut.decode(rawMessageBytes))
+ }
+ }
+ }
+})
+
+private fun <A> assertFailedWithError(option: Option<A>) =
+ option.exists {
+ fail("Error expected")
+ }
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
new file mode 100644
index 00000000..c6364f74
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
@@ -0,0 +1,157 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import com.nhaarman.mockitokotlin2.eq
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.mockito.Mockito
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
+
+import reactor.core.publisher.Mono
+import reactor.retry.Retry
+import reactor.test.StepVerifier
+import java.time.Duration
+import java.util.*
+import kotlin.test.assertEquals
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object ConsulConfigurationProviderTest : Spek({
+
+ describe("Consul configuration provider") {
+
+ val httpAdapterMock: HttpAdapter = mock()
+ val healthStateProvider = HealthState.INSTANCE
+
+ given("valid resource url") {
+ val validUrl = "http://valid-url/"
+ val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
+ .thenReturn(Mono.just(constructConsulResponse()))
+
+ it("should use received configuration") {
+
+ StepVerifier.create(consulConfigProvider().take(1))
+ .consumeNextWith {
+
+ assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
+
+ val route1 = it.routing.routes[0]
+ assertThat(FAULT.domainName)
+ .describedAs("routed domain 1")
+ .isEqualTo(route1.domain)
+ assertThat("test-topic-1")
+ .describedAs("target topic 1")
+ .isEqualTo(route1.targetTopic)
+
+ val route2 = it.routing.routes[1]
+ assertThat(HEARTBEAT.domainName)
+ .describedAs("routed domain 2")
+ .isEqualTo(route2.domain)
+ assertThat("test-topic-2")
+ .describedAs("target topic 2")
+ .isEqualTo(route2.targetTopic)
+
+ }.verifyComplete()
+ }
+ }
+
+ }
+ given("invalid resource url") {
+ val invalidUrl = "http://invalid-url/"
+
+ val iterationCount = 3L
+ val consulConfigProvider = constructConsulConfigProvider(
+ invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
+ )
+
+ on("call to consul") {
+ whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
+ .thenReturn(Mono.error(RuntimeException("Test exception")))
+
+ it("should interrupt the flux") {
+
+ StepVerifier.create(consulConfigProvider())
+ .verifyErrorMessage("Test exception")
+ }
+
+ it("should update the health state") {
+ StepVerifier.create(healthStateProvider().take(iterationCount))
+ .expectNextCount(iterationCount - 1)
+ .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
+ .verifyComplete()
+ }
+ }
+ }
+ }
+
+})
+
+private fun constructConsulConfigProvider(url: String,
+ httpAdapter: HttpAdapter,
+ healthState: HealthState,
+ iterationCount: Long = 1
+): ConsulConfigurationProvider {
+
+ val firstRequestDelay = Duration.ofMillis(1)
+ val requestInterval = Duration.ofMillis(1)
+ val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
+
+ return ConsulConfigurationProvider(
+ httpAdapter,
+ url,
+ firstRequestDelay,
+ requestInterval,
+ healthState,
+ retry
+ )
+}
+
+
+const val kafkaAddress = "message-router-kafka"
+
+fun constructConsulResponse(): String =
+ """{
+ "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
+ "collector.routing": [
+ {
+ "fromDomain": "fault",
+ "toTopic": "test-topic-1"
+ },
+ {
+ "fromDomain": "heartbeat",
+ "toTopic": "test-topic-2"
+ }
+ ]
+ }"""
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
new file mode 100644
index 00000000..91457faf
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
@@ -0,0 +1,86 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.adapters
+
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import reactor.core.publisher.Mono
+import reactor.netty.http.client.HttpClient
+import reactor.netty.http.server.HttpServer
+import reactor.test.StepVerifier
+import reactor.test.test
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object HttpAdapterTest : Spek({
+ describe("HttpAdapter") {
+
+ val httpServer = HttpServer.create()
+ .host("127.0.0.1")
+ .route { routes ->
+ routes.get("/url") { req, resp ->
+ resp.sendString(Mono.just(req.uri()))
+ }
+ }
+ .bindNow()
+ val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
+ val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
+
+ afterGroup {
+ httpServer.disposeNow()
+ }
+
+ given("url without query params") {
+ val url = "/url"
+
+ it("should not append query string") {
+ httpAdapter.get(url).test()
+ .expectNext(url)
+ .verifyComplete()
+ }
+ }
+
+ given("url with query params") {
+ val queryParams = mapOf(Pair("p", "the-value"))
+ val url = "/url"
+
+ it("should add them as query string to the url") {
+ httpAdapter.get(url, queryParams).test()
+ .expectNext("/url?p=the-value")
+ .verifyComplete()
+ }
+ }
+
+ given("invalid url") {
+ val invalidUrl = "/wtf"
+
+ it("should interrupt the flux") {
+ StepVerifier
+ .create(httpAdapter.get(invalidUrl))
+ .verifyError()
+ }
+ }
+ }
+
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
new file mode 100644
index 00000000..f06a0dc7
--- /dev/null
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -0,0 +1,234 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.impl.wire
+
+import io.netty.buffer.ByteBuf
+import io.netty.buffer.Unpooled
+import io.netty.buffer.UnpooledByteBufAllocator
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
+import reactor.test.test
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
+ * @since May 2018
+ */
+internal object WireChunkDecoderTest : Spek({
+ val alloc = UnpooledByteBufAllocator.DEFAULT
+ val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
+ val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
+
+ val encoder = WireFrameEncoder(alloc)
+
+ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
+
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
+
+ fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should be released: $bb ref count")
+ .isEqualTo(0)
+ }
+ }
+
+ fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
+ for (bb in byteBuffers) {
+ assertThat(bb.refCnt())
+ .describedAs("should not be released: $bb ref count")
+ .isEqualTo(1)
+ }
+ }
+
+ describe("decoding wire protocol") {
+ given("empty input") {
+ val input = Unpooled.EMPTY_BUFFER
+
+ it("should yield empty result") {
+ createInstance().decode(input).test().verifyComplete()
+ }
+ }
+
+ given("input with no readable bytes") {
+ val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
+
+ it("should yield empty result") {
+ createInstance().decode(input).test().verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input)
+ }
+ }
+
+ given("invalid input (not starting with marker)") {
+ val input = Unpooled.wrappedBuffer(samplePayload)
+
+ it("should yield error") {
+ createInstance().decode(input).test()
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input") {
+ val input = WireFrameMessage(samplePayload)
+
+ it("should yield decoded input frame") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+ }
+
+ given("valid input with part of next frame") {
+ val input = Unpooled.buffer()
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
+
+ it("should yield decoded input frame") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("valid input with garbage after it") {
+ val input = Unpooled.buffer()
+ .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
+ .writeBytes(Unpooled.wrappedBuffer(samplePayload))
+
+ it("should yield decoded input frame and error") {
+ createInstance().decode(input).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should leave memory unreleased") {
+ verifyMemoryNotReleased(input)
+ }
+ }
+
+ given("two inputs containing two separate messages") {
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing 1st frame and 2nd input containing garbage") {
+ val input1 = encoder.encode(WireFrameMessage(samplePayload))
+ val input2 = Unpooled.wrappedBuffer(anotherPayload)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1)
+ .test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .verifyError(WireFrameException::class.java)
+ }
+
+ it("should release memory for 1st input") {
+ verifyMemoryReleased(input1)
+ }
+
+ it("should leave memory unreleased for 2nd input") {
+ verifyMemoryNotReleased(input2)
+ }
+ }
+
+
+ given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2, 3)
+ val input2 = Unpooled.buffer().writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+
+ given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
+ val frame1 = encoder.encode(WireFrameMessage(samplePayload))
+ val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
+
+ val input1 = Unpooled.buffer()
+ .writeBytes(frame1, 5)
+ val input2 = Unpooled.buffer()
+ .writeBytes(frame1)
+ .writeBytes(frame2)
+
+ it("should yield decoded input frames") {
+ val cut = createInstance()
+ cut.decode(input1).test()
+ .verifyComplete()
+ cut.decode(input2).test()
+ .expectNextMatches { it.payloadSize == samplePayload.size }
+ .expectNextMatches { it.payloadSize == anotherPayload.size }
+ .verifyComplete()
+ }
+
+ it("should release memory") {
+ verifyMemoryReleased(input1, input2)
+ }
+ }
+ }
+}) \ No newline at end of file
diff --git a/sources/hv-collector-core/src/test/resources/logback-test.xml b/sources/hv-collector-core/src/test/resources/logback-test.xml
new file mode 100644
index 00000000..9a4eacfe
--- /dev/null
+++ b/sources/hv-collector-core/src/test/resources/logback-test.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+ <property name="LOG_FILE"
+ value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
+ <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
+
+ <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>
+ %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
+ </pattern>
+ </encoder>
+ </appender>
+
+ <appender name="ROLLING-FILE"
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder>
+ <pattern>${FILE_LOG_PATTERN}</pattern>
+ </encoder>
+ <file>${LOG_FILE}</file>
+ <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
+ <maxFileSize>50MB</maxFileSize>
+ <maxHistory>30</maxHistory>
+ <totalSizeCap>10GB</totalSizeCap>
+ </rollingPolicy>
+ </appender>
+
+ <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
+
+ <root level="INFO">
+ <appender-ref ref="CONSOLE"/>
+ <appender-ref ref="ROLLING-FILE"/>
+ </root>
+</configuration>