diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-28 15:46:50 +0100 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-11-29 14:41:42 +0100 |
commit | dde383a2aa75f94c26d7949665b79cc95486a223 (patch) | |
tree | 75f3e8f564067afd0e67dbe6254183e45ca26944 /hv-collector-core | |
parent | 77f896523f2065b1da1be21545155a29edea5122 (diff) |
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg.
concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}`
Unfortunately to avoid defining dependencies in many places and having
circural dependencies it was necessarry to reorganize the maven module
structure. The goal was to have `sources` module with production code and
`build` module with build-time tooling (detekt rules among them).
Issue-ID: DCAEGEN2-1002
Change-Id: I36e677b98972aaae6905d722597cbce5e863d201
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core')
33 files changed, 0 insertions, 2331 deletions
diff --git a/hv-collector-core/pom.xml b/hv-collector-core/pom.xml deleted file mode 100644 index a8135292..00000000 --- a/hv-collector-core/pom.xml +++ /dev/null @@ -1,137 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ ============LICENSE_START======================================================= - ~ dcaegen2-collectors-veshv - ~ ================================================================================ - ~ Copyright (C) 2018 NOKIA - ~ ================================================================================ - ~ Licensed under the Apache License, Version 2.0 (the "License"); - ~ you may not use this file except in compliance with the License. - ~ You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - ~ ============LICENSE_END========================================================= - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <licenses> - <license> - <name>The Apache Software License, Version 2.0</name> - <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> - </license> - </licenses> - - <parent> - <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> - <artifactId>ves-hv-collector</artifactId> - <version>1.1.0-SNAPSHOT</version> - <relativePath>..</relativePath> - </parent> - - <artifactId>hv-collector-core</artifactId> - <description>VES HighVolume Collector :: Core</description> - - <properties> - <skipAnalysis>false</skipAnalysis> - </properties> - - <build> - <plugins> - <plugin> - <artifactId>kotlin-maven-plugin</artifactId> - <groupId>org.jetbrains.kotlin</groupId> - </plugin> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <groupId>org.apache.maven.plugins</groupId> - </plugin> - <plugin> - <groupId>org.jacoco</groupId> - <artifactId>jacoco-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - - <dependencies> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-ssl</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-health-check</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - <scope>compile</scope> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> - </dependency> - - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-reflect</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-effects</artifactId> - </dependency> - <dependency> - <groupId>io.arrow-kt</groupId> - <artifactId>arrow-core</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-core</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.addons</groupId> - <artifactId>reactor-extra</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.netty</groupId> - <artifactId>reactor-netty</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor.kafka</groupId> - <artifactId>reactor-kafka</artifactId> - </dependency> - <dependency> - <groupId>javax.json</groupId> - <artifactId>javax.json-api</artifactId> - </dependency> - <dependency> - <groupId>org.glassfish</groupId> - <artifactId>javax.json</artifactId> - <scope>runtime</scope> - </dependency> - <dependency> - <groupId>ch.qos.logback</groupId> - <artifactId>logback-classic</artifactId> - <scope>test</scope> - </dependency> - </dependencies> - -</project> diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt deleted file mode 100644 index dd0111bc..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ /dev/null @@ -1,50 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.boundary - -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import reactor.core.publisher.Flux - -interface Sink { - fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> -} - -interface Metrics { - fun notifyBytesReceived(size: Int) - fun notifyMessageReceived(size: Int) - fun notifyMessageSent(topic: String) -} - -@FunctionalInterface -interface SinkProvider { - operator fun invoke(config: CollectorConfiguration): Sink - - companion object { - fun just(sink: Sink): SinkProvider = - object : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink = sink - } - } -} - -interface ConfigurationProvider { - operator fun invoke(): Flux<CollectorConfiguration> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt deleted file mode 100644 index 3c85a9b1..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.boundary - -import arrow.core.Option -import arrow.effects.IO -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -interface Collector { - fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> -} - -typealias CollectorProvider = () -> Option<Collector> - -interface Server { - fun start(): IO<ServerHandle> -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt deleted file mode 100644 index 5c96e1c5..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.impl.Router -import org.onap.dcae.collectors.veshv.impl.VesDecoder -import org.onap.dcae.collectors.veshv.impl.VesHvCollector -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.getOption -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.util.concurrent.atomic.AtomicReference - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -class CollectorFactory(val configuration: ConfigurationProvider, - private val sinkProvider: SinkProvider, - private val metrics: Metrics, - private val maximumPayloadSizeBytes: Int, - private val healthState: HealthState = HealthState.INSTANCE) { - - fun createVesHvCollectorProvider(): CollectorProvider { - val collector: AtomicReference<Collector> = AtomicReference() - configuration() - .map(this::createVesHvCollector) - .doOnNext { - logger.info("Using updated configuration for new connections") - healthState.changeState(HealthDescription.HEALTHY) - } - .doOnError { - logger.error("Failed to acquire configuration from consul") - healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND) - } - .subscribe(collector::set) - return collector::getOption - } - - private fun createVesHvCollector(config: CollectorConfiguration): Collector { - return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> - WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) - }, - protobufDecoder = VesDecoder(), - router = Router(config.routing), - sink = sinkProvider(config), - metrics = metrics) - } - - companion object { - private val logger = Logger(CollectorFactory::class) - } -} - diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt deleted file mode 100644 index dce933ab..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ /dev/null @@ -1,35 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.factory - -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object ServerFactory { - fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server = - NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt deleted file mode 100644 index fb949079..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt +++ /dev/null @@ -1,38 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors -import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -internal object MessageValidator { - - fun isValid(message: VesMessage): Boolean { - return allMandatoryFieldsArePresent(message.header) - } - - private fun allMandatoryFieldsArePresent(header: CommonEventHeader) = - headerRequiredFieldDescriptors - .all { fieldDescriptor -> header.hasField(fieldDescriptor) } - .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion)) - -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt deleted file mode 100644 index cee658b6..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Option -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.Routing -import org.onap.dcae.collectors.veshv.model.VesMessage - -class Router(private val routing: Routing) { - fun findDestination(message: VesMessage): Option<RoutedMessage> = - routing.routeFor(message.header).map { it(message) } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt deleted file mode 100644 index 1d43588f..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt +++ /dev/null @@ -1,39 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Try -import arrow.core.Option -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.VesEvent - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class VesDecoder { - - fun decode(bytes: ByteData): Option<VesMessage> = - Try { - val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader - VesMessage(decodedHeader, bytes) - }.toOption() -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt deleted file mode 100644 index b700f135..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ /dev/null @@ -1,88 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Option -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.boundary.Collector -import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class VesHvCollector( - private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder, - private val protobufDecoder: VesDecoder, - private val router: Router, - private val sink: Sink, - private val metrics: Metrics) : Collector { - - override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> = - wireChunkDecoderSupplier(alloc).let { wireDecoder -> - dataStream - .transform { decodeWireFrame(it, wireDecoder) } - .filter(WireFrameMessage::isValid) - .transform(::decodePayload) - .filter(VesMessage::isValid) - .transform(::routeMessage) - .onErrorResume { logger.handleReactiveStreamError(it) } - .doFinally { releaseBuffersMemory(wireDecoder) } - .then() - } - - private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux - .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) } - .concatMap(decoder::decode) - .doOnNext { metrics.notifyMessageReceived(it.payloadSize) } - - private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux - .map(WireFrameMessage::payload) - .map(protobufDecoder::decode) - .flatMap { omitWhenNone(it) } - - private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux - .flatMap(this::findRoute) - .compose(sink::send) - .doOnNext { metrics.notifyMessageSent(it.topic) } - - - private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg)) - - private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold( - { Mono.empty() }, - { Mono.just(it) }) - - private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release() - - companion object { - private val logger = Logger(VesHvCollector::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt deleted file mode 100644 index 8c16736d..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import reactor.netty.http.client.HttpClient - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object AdapterFactory { - fun kafkaSink(): SinkProvider = KafkaSinkProvider() - fun loggingSink(): SinkProvider = LoggingSinkProvider() - - fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider = - ConsulConfigurationProvider(httpAdapter(), configurationProviderParams) - - private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create()) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt deleted file mode 100644 index ec7c60c0..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt +++ /dev/null @@ -1,122 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.retry.Jitter -import reactor.retry.Retry -import java.io.StringReader -import java.time.Duration -import java.util.concurrent.atomic.AtomicReference -import javax.json.Json -import javax.json.JsonObject - - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal class ConsulConfigurationProvider(private val http: HttpAdapter, - private val url: String, - private val firstRequestDelay: Duration, - private val requestInterval: Duration, - private val healthState: HealthState, - retrySpec: Retry<Any> - -) : ConfigurationProvider { - - private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0) - private val retry = retrySpec - .doOnRetry { - logger.warn("Could not get fresh configuration", it.exception()) - healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - } - - constructor(http: HttpAdapter, - params: ConfigurationProviderParams) : this( - http, - params.configurationUrl, - params.firstRequestDelay, - params.requestInterval, - HealthState.INSTANCE, - Retry.any<Any>() - .retryMax(MAX_RETRIES) - .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR)) - .jitter(Jitter.random()) - ) - - override fun invoke(): Flux<CollectorConfiguration> = - Flux.interval(firstRequestDelay, requestInterval) - .concatMap { askForConfig() } - .flatMap(::filterDifferentValues) - .map(::parseJsonResponse) - .map(::createCollectorConfiguration) - .retryWhen(retry) - - private fun askForConfig(): Mono<String> = http.get(url) - - private fun filterDifferentValues(configurationString: String) = - hashOf(configurationString).let { - if (it == lastConfigurationHash.get()) { - Mono.empty() - } else { - lastConfigurationHash.set(it) - Mono.just(configurationString) - } - } - - private fun hashOf(str: String) = str.hashCode() - - private fun parseJsonResponse(responseString: String): JsonObject = - Json.createReader(StringReader(responseString)).readObject() - - private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration { - logger.info { "Obtained new configuration from consul:\n${configuration}" } - val routing = configuration.getJsonArray("collector.routing") - - return CollectorConfiguration( - kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"), - routing = org.onap.dcae.collectors.veshv.model.routing { - for (route in routing) { - val routeObj = route.asJsonObject() - defineRoute { - fromDomain(routeObj.getString("fromDomain")) - toTopic(routeObj.getString("toTopic")) - withFixedPartitioning() - } - } - }.build() - ) - } - - companion object { - private const val MAX_RETRIES = 5L - private const val BACKOFF_INTERVAL_FACTOR = 30L - private val logger = Logger(ConsulConfigurationProvider::class) - } -} - diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt deleted file mode 100644 index bdce6f73..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ /dev/null @@ -1,68 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import io.netty.handler.codec.http.HttpStatusClass -import org.slf4j.LoggerFactory -import reactor.core.publisher.Mono -import reactor.netty.http.client.HttpClient - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -open class HttpAdapter(private val httpClient: HttpClient) { - - private val logger = LoggerFactory.getLogger(HttpAdapter::class.java) - - open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient - .get() - .uri(url + createQueryString(queryParams)) - .responseSingle { response, content -> - if (response.status().codeClass() == HttpStatusClass.SUCCESS) - content.asString() - else { - val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}" - Mono.error(IllegalStateException(errorMessage)) - } - } - .doOnError { - logger.error("Failed to get resource on path: $url (${it.localizedMessage})") - logger.debug("Nested exception:", it) - } - - private fun createQueryString(params: Map<String, Any>): String { - if (params.isEmpty()) - return "" - - val builder = StringBuilder("?") - params.forEach { (key, value) -> - builder - .append(key) - .append("=") - .append(value) - .append("&") - - } - - return builder.removeSuffix("&").toString() - } - -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt deleted file mode 100644 index 5f4bf354..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt +++ /dev/null @@ -1,62 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Flux -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -internal class LoggingSinkProvider : SinkProvider { - - override fun invoke(config: CollectorConfiguration): Sink { - return object : Sink { - private val totalMessages = AtomicLong() - private val totalBytes = AtomicLong() - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> = - messages - .doOnNext(this::logMessage) - - private fun logMessage(msg: RoutedMessage) { - val msgs = totalMessages.addAndGet(1) - val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong()) - val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" } - if (msgs % INFO_LOGGING_FREQ == 0L) - logger.info(logMessageSupplier) - else - logger.trace(logMessageSupplier) - } - - } - } - - companion object { - const val INFO_LOGGING_FREQ = 100_000 - private val logger = Logger(LoggingSinkProvider::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt deleted file mode 100644 index a0c22418..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt +++ /dev/null @@ -1,82 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.core.publisher.Flux -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderRecord -import reactor.kafka.sender.SenderResult -import java.util.concurrent.atomic.AtomicLong - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink { - private val sentMessages = AtomicLong(0) - - override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> { - val records = messages.map(this::vesToKafkaRecord) - val result = sender.send(records) - .doOnNext(::logException) - .filter(::isSuccessful) - .map { it.correlationMetadata() } - - return if (logger.traceEnabled) { - result.doOnNext(::logSentMessage) - } else { - result - } - } - - private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> { - return SenderRecord.create( - msg.topic, - msg.partition, - System.currentTimeMillis(), - msg.message.header, - msg.message, - msg) - } - - private fun logException(senderResult: SenderResult<out Any>) { - if (senderResult.exception() != null) { - logger.warn(senderResult.exception()) { "Failed to send message to Kafka" } - } - } - - private fun logSentMessage(sentMsg: RoutedMessage) { - logger.trace { - val msgNum = sentMessages.incrementAndGet() - "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}" - } - } - - private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null - - companion object { - val logger = Logger(KafkaSink::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt deleted file mode 100644 index 18191952..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.apache.kafka.clients.producer.ProducerConfig -import org.onap.dcae.collectors.veshv.boundary.Sink -import org.onap.dcae.collectors.veshv.boundary.SinkProvider -import org.onap.dcae.collectors.veshv.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.ves.VesEventOuterClass.CommonEventHeader -import reactor.kafka.sender.KafkaSender -import reactor.kafka.sender.SenderOptions - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -internal class KafkaSinkProvider : SinkProvider { - override fun invoke(config: CollectorConfiguration): Sink { - return KafkaSink(KafkaSender.create(constructSenderOptions(config))) - } - - private fun constructSenderOptions(config: CollectorConfiguration) = - SenderOptions.create<CommonEventHeader, VesMessage>() - .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers) - .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java) - .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt deleted file mode 100644 index 4e9932cc..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import com.google.protobuf.MessageLite -import org.apache.kafka.common.serialization.Serializer - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class ProtobufSerializer : Serializer<MessageLite> { - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - // no configuration - } - - override fun serialize(topic: String?, data: MessageLite?): ByteArray? = - data?.toByteArray() - - override fun close() { - // cleanup not needed - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt deleted file mode 100644 index 7a6ac7c8..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters.kafka - -import org.apache.kafka.common.serialization.Serializer -import org.onap.dcae.collectors.veshv.model.VesMessage - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class VesMessageSerializer : Serializer<VesMessage> { - override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) { - } - - override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray() - - override fun close() { - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt deleted file mode 100644 index e535300a..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ /dev/null @@ -1,114 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.socket - -import arrow.core.getOrElse -import arrow.effects.IO -import org.onap.dcae.collectors.veshv.boundary.CollectorProvider -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory -import org.onap.dcae.collectors.veshv.utils.NettyServerHandle -import org.onap.dcae.collectors.veshv.utils.ServerHandle -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import reactor.core.publisher.Mono -import reactor.netty.ByteBufFlux -import reactor.netty.Connection -import reactor.netty.NettyInbound -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpServer -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class NettyTcpServer(private val serverConfig: ServerConfiguration, - private val sslContextFactory: ServerSslContextFactory, - private val collectorProvider: CollectorProvider) : Server { - - override fun start(): IO<ServerHandle> = IO { - val tcpServer = TcpServer.create() - .addressSupplier { serverConfig.serverListenAddress } - .configureSsl() - .handle(this::handleConnection) - - NettyServerHandle(tcpServer.bindNow()) - } - - private fun TcpServer.configureSsl() = - sslContextFactory - .createSslContext(serverConfig.securityConfiguration) - .map { sslContext -> - this.secure { b -> b.sslContext(sslContext) } - }.getOrElse { this } - - private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> = - collectorProvider().fold( - { - nettyInbound.withConnection { conn -> - logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." } - } - Mono.empty() - }, - { - nettyInbound.withConnection { conn -> - logger.info { "Handling connection from ${conn.address()}" } - conn.configureIdleTimeout(serverConfig.idleTimeout) - .logConnectionClosed() - } - it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound)) - } - ) - - private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound - .receive() - .retain() - - private fun Connection.configureIdleTimeout(timeout: Duration): Connection { - onReadIdle(timeout.toMillis()) { - logger.info { - "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..." - } - disconnectClient() - } - return this - } - - private fun Connection.disconnectClient() { - channel().close().addListener { - if (it.isSuccess) - logger.debug { "Channel (${address()}) closed successfully." } - else - logger.warn("Channel close failed", it.cause()) - } - } - - private fun Connection.logConnectionClosed(): Connection { - onTerminate().subscribe { - logger.info("Connection from ${address()} has been closed") - } - return this - } - - companion object { - private val logger = Logger(NettyTcpServer::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt deleted file mode 100644 index 4a2ef6b2..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ /dev/null @@ -1,101 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import arrow.effects.IO -import io.netty.buffer.ByteBuf -import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError -import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError -import reactor.core.publisher.Flux -import reactor.core.publisher.SynchronousSink - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -internal class WireChunkDecoder( - private val decoder: WireFrameDecoder, - alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) { - private val streamBuffer = alloc.compositeBuffer() - - fun release() { - streamBuffer.release() - } - - fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer { - logIncomingMessage(byteBuf) - if (byteBuf.readableBytes() == 0) { - byteBuf.release() - Flux.empty() - } else { - streamBuffer.addComponent(true, byteBuf) - generateFrames() - .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) } - .doFinally { streamBuffer.discardReadComponents() } - } - } - - private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next -> - decoder.decodeFirst(streamBuffer) - .fold(onError(next), onSuccess(next)) - .unsafeRunSync() - } - - private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err -> - when (err) { - is InvalidWireFrame -> IO { - next.error(WireFrameException(err)) - } - is MissingWireFrameBytes -> IO { - logEndOfData() - next.complete() - } - } - } - - private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame -> - IO { - logDecodedWireMessage(frame) - next.next(frame) - } - } - - private fun logIncomingMessage(wire: ByteBuf) { - logger.trace { "Got message with total size of ${wire.readableBytes()} B" } - } - - private fun logDecodedWireMessage(wire: WireFrameMessage) { - logger.trace { "Wire payload size: ${wire.payloadSize} B" } - } - - private fun logEndOfData() { - logger.trace { "End of data in current TCP buffer" } - } - - companion object { - val logger = Logger(WireChunkDecoder::class) - } -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt deleted file mode 100644 index 83a7cd85..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt +++ /dev/null @@ -1,29 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since June 2018 - */ -class WireFrameException(error: WireFrameDecodingError) - : Exception("${error::class.simpleName}: ${error.message}") diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt deleted file mode 100644 index ec546c7d..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt +++ /dev/null @@ -1,26 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt deleted file mode 100644 index 9de34498..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt +++ /dev/null @@ -1,30 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import java.time.Duration - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since July 2018 - */ -data class ConfigurationProviderParams(val configurationUrl: String, - val firstRequestDelay: Duration, - val requestInterval: Duration) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt deleted file mode 100644 index 782877e3..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt +++ /dev/null @@ -1,22 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt deleted file mode 100644 index 85117684..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ /dev/null @@ -1,37 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import java.net.InetSocketAddress -import java.time.Duration - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class ServerConfiguration( - val serverListenAddress: InetSocketAddress, - val configurationProviderParams: ConfigurationProviderParams, - val securityConfiguration: SecurityConfiguration, - val idleTimeout: Duration, - val healthCheckApiListenAddress: InetSocketAddress, - val maximumPayloadSizeBytes: Int, - val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt deleted file mode 100644 index f5bfcce1..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt +++ /dev/null @@ -1,32 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.impl.MessageValidator -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) { - fun isValid(): Boolean = MessageValidator.isValid(this) -} diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt deleted file mode 100644 index bab95c57..00000000 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.model - -import arrow.core.Option -import org.onap.ves.VesEventOuterClass.CommonEventHeader - -data class Routing(val routes: List<Route>) { - - fun routeFor(commonHeader: CommonEventHeader): Option<Route> = - Option.fromNullable(routes.find { it.applies(commonHeader) }) -} - -data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) { - - fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain - - operator fun invoke(message: VesMessage): RoutedMessage = - RoutedMessage(targetTopic, partitioning(message.header), message) -} - - -/* -Configuration DSL - */ - -fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder { - val conf = RoutingBuilder() - conf.init() - return conf -} - -class RoutingBuilder { - private val routes: MutableList<RouteBuilder> = mutableListOf() - - fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder { - val rule = RouteBuilder() - rule.init() - routes.add(rule) - return rule - } - - fun build() = Routing(routes.map { it.build() }.toList()) -} - -class RouteBuilder { - - private lateinit var domain: String - private lateinit var targetTopic: String - private lateinit var partitioning: (CommonEventHeader) -> Int - - fun fromDomain(domain: String) { - this.domain = domain - } - - fun toTopic(targetTopic: String) { - this.targetTopic = targetTopic - } - - fun withFixedPartitioning(num: Int = 0) { - partitioning = { num } - } - - fun build() = Route(domain, targetTopic, partitioning) - -} diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt deleted file mode 100644 index 3090042d..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt +++ /dev/null @@ -1,128 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.VesEventDomain -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import org.onap.ves.VesEventOuterClass.CommonEventHeader.* - -internal object MessageValidatorTest : Spek({ - - given("Message validator") { - val cut = MessageValidator - - on("ves hv message including header with fully initialized fields") { - val commonHeader = commonHeader() - - it("should accept message with fully initialized message header") { - val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader)) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue() - } - - VesEventDomain.values() - .forEach { domain -> - it("should accept message with $domain domain") { - val header = commonHeader(domain) - val vesMessage = VesMessage(header, vesEventBytes(header)) - assertThat(cut.isValid(vesMessage)) - .isTrue() - } - } - } - - on("ves hv message bytes") { - val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY) - it("should not accept message with default header") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() - } - } - - val priorityTestCases = mapOf( - Priority.PRIORITY_NOT_PROVIDED to false, - Priority.HIGH to true - ) - - priorityTestCases.forEach { value, expectedResult -> - on("ves hv message including header with priority $value") { - val commonEventHeader = commonHeader(priority = value) - val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader)) - - it("should resolve validation result") { - assertThat(cut.isValid(vesMessage)).describedAs("message validation results") - .isEqualTo(expectedResult) - } - } - } - - on("ves hv message including header with not initialized fields") { - val commonHeader = newBuilder() - .setVersion("1.9") - .setEventName("Sample event name") - .setEventId("Sample event Id") - .setSourceName("Sample Source") - .build() - val rawMessageBytes = vesEventBytes(commonHeader) - - it("should not accept not fully initialized message header") { - val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() - } - } - - on("ves hv message including header.vesEventListenerVersion with non-string major part") { - val commonHeader = commonHeader(vesEventListenerVersion = "sample-version") - val rawMessageBytes = vesEventBytes(commonHeader) - - - it("should not accept message header") { - val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() - } - } - - on("ves hv message including header.vesEventListenerVersion with major part != 7") { - val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3") - val rawMessageBytes = vesEventBytes(commonHeader) - - it("should not accept message header") { - val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() - } - } - - on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") { - val commonHeader = commonHeader(vesEventListenerVersion = "7.test") - val rawMessageBytes = vesEventBytes(commonHeader) - - it("should not accept message header") { - val vesMessage = VesMessage(commonHeader, rawMessageBytes) - assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse() - } - } - } -}) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt deleted file mode 100644 index e8a31231..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ /dev/null @@ -1,112 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.None -import arrow.core.Some -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG -import org.onap.dcae.collectors.veshv.model.RoutedMessage -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.model.routing -import org.onap.dcae.collectors.veshv.tests.utils.commonHeader - - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> - * @since May 2018 - */ -object RouterTest : Spek({ - given("sample configuration") { - val config = routing { - - defineRoute { - fromDomain(PERF3GPP.domainName) - toTopic("ves_rtpm") - withFixedPartitioning(2) - } - - defineRoute { - fromDomain(SYSLOG.domainName) - toTopic("ves_trace") - withFixedPartitioning() - } - }.build() - val cut = Router(config) - - on("message with existing route (rtpm)") { - val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2)) - } - - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm")) - } - - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } - - on("message with existing route (trace)") { - val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY) - val result = cut.findDestination(message) - - it("should have route available") { - assertThat(result).isNotNull() - } - - it("should be routed to proper partition") { - assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0)) - } - - it("should be routed to proper topic") { - assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace")) - } - - it("should be routed with a given message") { - assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message)) - } - } - - on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY) - val result = cut.findDestination(message) - - it("should not have route available") { - assertThat(result).isEqualTo(None) - } - } - } -})
\ No newline at end of file diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt deleted file mode 100644 index 8950a557..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl - -import arrow.core.Option -import com.google.protobuf.ByteString -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.domain.ByteData -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.model.VesMessage -import org.onap.dcae.collectors.veshv.tests.utils.commonHeader -import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes -import java.nio.charset.Charset -import kotlin.test.assertTrue -import kotlin.test.fail - - -internal object VesDecoderTest : Spek({ - - given("ves message decoder") { - val cut = VesDecoder() - - on("ves hv message bytes") { - val commonHeader = commonHeader(HEARTBEAT) - val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements")) - - it("should decode only header and pass it on along with raw message") { - val expectedMessage = VesMessage( - commonHeader, - rawMessageBytes - ) - - assertTrue { - cut.decode(rawMessageBytes).exists { - it == expectedMessage - } - } - } - } - - on("invalid ves hv message bytes") { - val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset())) - - it("should throw error") { - assertFailedWithError(cut.decode(rawMessageBytes)) - } - } - } -}) - -private fun <A> assertFailedWithError(option: Option<A>) = - option.exists { - fail("Error expected") - } diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt deleted file mode 100644 index c6364f74..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt +++ /dev/null @@ -1,157 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import com.nhaarman.mockitokotlin2.eq -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.mockito.Mockito -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState - -import reactor.core.publisher.Mono -import reactor.retry.Retry -import reactor.test.StepVerifier -import java.time.Duration -import java.util.* -import kotlin.test.assertEquals - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal object ConsulConfigurationProviderTest : Spek({ - - describe("Consul configuration provider") { - - val httpAdapterMock: HttpAdapter = mock() - val healthStateProvider = HealthState.INSTANCE - - given("valid resource url") { - val validUrl = "http://valid-url/" - val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider) - - on("call to consul") { - whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap())) - .thenReturn(Mono.just(constructConsulResponse())) - - it("should use received configuration") { - - StepVerifier.create(consulConfigProvider().take(1)) - .consumeNextWith { - - assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers) - - val route1 = it.routing.routes[0] - assertThat(FAULT.domainName) - .describedAs("routed domain 1") - .isEqualTo(route1.domain) - assertThat("test-topic-1") - .describedAs("target topic 1") - .isEqualTo(route1.targetTopic) - - val route2 = it.routing.routes[1] - assertThat(HEARTBEAT.domainName) - .describedAs("routed domain 2") - .isEqualTo(route2.domain) - assertThat("test-topic-2") - .describedAs("target topic 2") - .isEqualTo(route2.targetTopic) - - }.verifyComplete() - } - } - - } - given("invalid resource url") { - val invalidUrl = "http://invalid-url/" - - val iterationCount = 3L - val consulConfigProvider = constructConsulConfigProvider( - invalidUrl, httpAdapterMock, healthStateProvider, iterationCount - ) - - on("call to consul") { - whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap())) - .thenReturn(Mono.error(RuntimeException("Test exception"))) - - it("should interrupt the flux") { - - StepVerifier.create(consulConfigProvider()) - .verifyErrorMessage("Test exception") - } - - it("should update the health state") { - StepVerifier.create(healthStateProvider().take(iterationCount)) - .expectNextCount(iterationCount - 1) - .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION) - .verifyComplete() - } - } - } - } - -}) - -private fun constructConsulConfigProvider(url: String, - httpAdapter: HttpAdapter, - healthState: HealthState, - iterationCount: Long = 1 -): ConsulConfigurationProvider { - - val firstRequestDelay = Duration.ofMillis(1) - val requestInterval = Duration.ofMillis(1) - val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1)) - - return ConsulConfigurationProvider( - httpAdapter, - url, - firstRequestDelay, - requestInterval, - healthState, - retry - ) -} - - -const val kafkaAddress = "message-router-kafka" - -fun constructConsulResponse(): String = - """{ - "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093", - "collector.routing": [ - { - "fromDomain": "fault", - "toTopic": "test-topic-1" - }, - { - "fromDomain": "heartbeat", - "toTopic": "test-topic-2" - } - ] - }""" diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt deleted file mode 100644 index 91457faf..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt +++ /dev/null @@ -1,86 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.adapters - -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import reactor.core.publisher.Mono -import reactor.netty.http.client.HttpClient -import reactor.netty.http.server.HttpServer -import reactor.test.StepVerifier -import reactor.test.test - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal object HttpAdapterTest : Spek({ - describe("HttpAdapter") { - - val httpServer = HttpServer.create() - .host("127.0.0.1") - .route { routes -> - routes.get("/url") { req, resp -> - resp.sendString(Mono.just(req.uri())) - } - } - .bindNow() - val baseUrl = "http://${httpServer.host()}:${httpServer.port()}" - val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl)) - - afterGroup { - httpServer.disposeNow() - } - - given("url without query params") { - val url = "/url" - - it("should not append query string") { - httpAdapter.get(url).test() - .expectNext(url) - .verifyComplete() - } - } - - given("url with query params") { - val queryParams = mapOf(Pair("p", "the-value")) - val url = "/url" - - it("should add them as query string to the url") { - httpAdapter.get(url, queryParams).test() - .expectNext("/url?p=the-value") - .verifyComplete() - } - } - - given("invalid url") { - val invalidUrl = "/wtf" - - it("should interrupt the flux") { - StepVerifier - .create(httpAdapter.get(invalidUrl)) - .verifyError() - } - } - } - -})
\ No newline at end of file diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt deleted file mode 100644 index f06a0dc7..00000000 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ /dev/null @@ -1,234 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.impl.wire - -import io.netty.buffer.ByteBuf -import io.netty.buffer.Unpooled -import io.netty.buffer.UnpooledByteBufAllocator -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given -import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import reactor.test.test - -/** - * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com> - * @since May 2018 - */ -internal object WireChunkDecoderTest : Spek({ - val alloc = UnpooledByteBufAllocator.DEFAULT - val samplePayload = "konstantynopolitanczykowianeczka".toByteArray() - val anotherPayload = "ala ma kota a kot ma ale".toByteArray() - - val encoder = WireFrameEncoder(alloc) - - fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - - fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) - - fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { - for (bb in byteBuffers) { - assertThat(bb.refCnt()) - .describedAs("should be released: $bb ref count") - .isEqualTo(0) - } - } - - fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) { - for (bb in byteBuffers) { - assertThat(bb.refCnt()) - .describedAs("should not be released: $bb ref count") - .isEqualTo(1) - } - } - - describe("decoding wire protocol") { - given("empty input") { - val input = Unpooled.EMPTY_BUFFER - - it("should yield empty result") { - createInstance().decode(input).test().verifyComplete() - } - } - - given("input with no readable bytes") { - val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1) - - it("should yield empty result") { - createInstance().decode(input).test().verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input) - } - } - - given("invalid input (not starting with marker)") { - val input = Unpooled.wrappedBuffer(samplePayload) - - it("should yield error") { - createInstance().decode(input).test() - .verifyError(WireFrameException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("valid input") { - val input = WireFrameMessage(samplePayload) - - it("should yield decoded input frame") { - createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - } - } - - given("valid input with part of next frame") { - val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) - .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3)) - - it("should yield decoded input frame") { - createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("valid input with garbage after it") { - val input = Unpooled.buffer() - .writeBytes(encoder.encode(WireFrameMessage(samplePayload))) - .writeBytes(Unpooled.wrappedBuffer(samplePayload)) - - it("should yield decoded input frame and error") { - createInstance().decode(input).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyError(WireFrameException::class.java) - } - - it("should leave memory unreleased") { - verifyMemoryNotReleased(input) - } - } - - given("two inputs containing two separate messages") { - val input1 = encoder.encode(WireFrameMessage(samplePayload)) - val input2 = encoder.encode(WireFrameMessage(anotherPayload)) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - - given("1st input containing 1st frame and 2nd input containing garbage") { - val input1 = encoder.encode(WireFrameMessage(samplePayload)) - val input2 = Unpooled.wrappedBuffer(anotherPayload) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input1) - .test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .verifyError(WireFrameException::class.java) - } - - it("should release memory for 1st input") { - verifyMemoryReleased(input1) - } - - it("should leave memory unreleased for 2nd input") { - verifyMemoryNotReleased(input2) - } - } - - - given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") { - val frame1 = encoder.encode(WireFrameMessage(samplePayload)) - val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) - - val input1 = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2, 3) - val input2 = Unpooled.buffer().writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input1).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - - given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") { - val frame1 = encoder.encode(WireFrameMessage(samplePayload)) - val frame2 = encoder.encode(WireFrameMessage(anotherPayload)) - - val input1 = Unpooled.buffer() - .writeBytes(frame1, 5) - val input2 = Unpooled.buffer() - .writeBytes(frame1) - .writeBytes(frame2) - - it("should yield decoded input frames") { - val cut = createInstance() - cut.decode(input1).test() - .verifyComplete() - cut.decode(input2).test() - .expectNextMatches { it.payloadSize == samplePayload.size } - .expectNextMatches { it.payloadSize == anotherPayload.size } - .verifyComplete() - } - - it("should release memory") { - verifyMemoryReleased(input1, input2) - } - } - } -})
\ No newline at end of file diff --git a/hv-collector-core/src/test/resources/logback-test.xml b/hv-collector-core/src/test/resources/logback-test.xml deleted file mode 100644 index 9a4eacfe..00000000 --- a/hv-collector-core/src/test/resources/logback-test.xml +++ /dev/null @@ -1,35 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<configuration> - <property name="LOG_FILE" - value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> - <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> - - <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern> - %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n - </pattern> - </encoder> - </appender> - - <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> - <encoder> - <pattern>${FILE_LOG_PATTERN}</pattern> - </encoder> - <file>${LOG_FILE}</file> - <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> - <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> - <maxFileSize>50MB</maxFileSize> - <maxHistory>30</maxHistory> - <totalSizeCap>10GB</totalSizeCap> - </rollingPolicy> - </appender> - - <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> - - <root level="INFO"> - <appender-ref ref="CONSOLE"/> - <appender-ref ref="ROLLING-FILE"/> - </root> -</configuration> |