aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-28 15:46:50 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-29 14:41:42 +0100
commitdde383a2aa75f94c26d7949665b79cc95486a223 (patch)
tree75f3e8f564067afd0e67dbe6254183e45ca26944 /hv-collector-core/src/main/kotlin
parent77f896523f2065b1da1be21545155a29edea5122 (diff)
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core/src/main/kotlin')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt50
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt38
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt80
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt35
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt38
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt39
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt88
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt122
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt68
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt62
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt82
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt45
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt114
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt101
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt32
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt83
25 files changed, 0 insertions, 1368 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
deleted file mode 100644
index dd0111bc..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.boundary
-
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import reactor.core.publisher.Flux
-
-interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
-}
-
-interface Metrics {
- fun notifyBytesReceived(size: Int)
- fun notifyMessageReceived(size: Int)
- fun notifyMessageSent(topic: String)
-}
-
-@FunctionalInterface
-interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration): Sink
-
- companion object {
- fun just(sink: Sink): SinkProvider =
- object : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink = sink
- }
- }
-}
-
-interface ConfigurationProvider {
- operator fun invoke(): Flux<CollectorConfiguration>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
deleted file mode 100644
index 3c85a9b1..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.boundary
-
-import arrow.core.Option
-import arrow.effects.IO
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-interface Collector {
- fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
-}
-
-typealias CollectorProvider = () -> Option<Collector>
-
-interface Server {
- fun start(): IO<ServerHandle>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
deleted file mode 100644
index 5c96e1c5..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.factory
-
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.impl.Router
-import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class CollectorFactory(val configuration: ConfigurationProvider,
- private val sinkProvider: SinkProvider,
- private val metrics: Metrics,
- private val maximumPayloadSizeBytes: Int,
- private val healthState: HealthState = HealthState.INSTANCE) {
-
- fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
- configuration()
- .map(this::createVesHvCollector)
- .doOnNext {
- logger.info("Using updated configuration for new connections")
- healthState.changeState(HealthDescription.HEALTHY)
- }
- .doOnError {
- logger.error("Failed to acquire configuration from consul")
- healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
- }
- .subscribe(collector::set)
- return collector::getOption
- }
-
- private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return VesHvCollector(
- wireChunkDecoderSupplier = { alloc ->
- WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
- },
- protobufDecoder = VesDecoder(),
- router = Router(config.routing),
- sink = sinkProvider(config),
- metrics = metrics)
- }
-
- companion object {
- private val logger = Logger(CollectorFactory::class)
- }
-}
-
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
deleted file mode 100644
index dce933ab..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.factory
-
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
- NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
deleted file mode 100644
index fb949079..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
-import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-internal object MessageValidator {
-
- fun isValid(message: VesMessage): Boolean {
- return allMandatoryFieldsArePresent(message.header)
- }
-
- private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
- headerRequiredFieldDescriptors
- .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
- .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
-
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
deleted file mode 100644
index cee658b6..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.VesMessage
-
-class Router(private val routing: Routing) {
- fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
deleted file mode 100644
index 1d43588f..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Try
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.VesEvent
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class VesDecoder {
-
- fun decode(bytes: ByteData): Option<VesMessage> =
- Try {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- VesMessage(decodedHeader, bytes)
- }.toOption()
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
deleted file mode 100644
index b700f135..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Option
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class VesHvCollector(
- private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
- private val protobufDecoder: VesDecoder,
- private val router: Router,
- private val sink: Sink,
- private val metrics: Metrics) : Collector {
-
- override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireChunkDecoderSupplier(alloc).let { wireDecoder ->
- dataStream
- .transform { decodeWireFrame(it, wireDecoder) }
- .filter(WireFrameMessage::isValid)
- .transform(::decodePayload)
- .filter(VesMessage::isValid)
- .transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(it) }
- .doFinally { releaseBuffersMemory(wireDecoder) }
- .then()
- }
-
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
- .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(decoder::decode)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
-
- private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
- .map(WireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .flatMap { omitWhenNone(it) }
-
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
-
-
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
- private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
- { Mono.empty() },
- { Mono.just(it) })
-
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-
- companion object {
- private val logger = Logger(VesHvCollector::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
deleted file mode 100644
index 8c16736d..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.netty.http.client.HttpClient
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
-
- fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
- private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644
index ec7c60c0..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.time.Duration
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
- private val url: String,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
-
- private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
- private val retry = retrySpec
- .doOnRetry {
- logger.warn("Could not get fresh configuration", it.exception())
- healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- }
-
- constructor(http: HttpAdapter,
- params: ConfigurationProviderParams) : this(
- http,
- params.configurationUrl,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random())
- )
-
- override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(firstRequestDelay, requestInterval)
- .concatMap { askForConfig() }
- .flatMap(::filterDifferentValues)
- .map(::parseJsonResponse)
- .map(::createCollectorConfiguration)
- .retryWhen(retry)
-
- private fun askForConfig(): Mono<String> = http.get(url)
-
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- Mono.empty()
- } else {
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
- }
- }
-
- private fun hashOf(str: String) = str.hashCode()
-
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readObject()
-
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- logger.info { "Obtained new configuration from consul:\n${configuration}" }
- val routing = configuration.getJsonArray("collector.routing")
-
- return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString("fromDomain"))
- toTopic(routeObj.getString("toTopic"))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- }
-
- companion object {
- private const val MAX_RETRIES = 5L
- private const val BACKOFF_INTERVAL_FACTOR = 30L
- private val logger = Logger(ConsulConfigurationProvider::class)
- }
-}
-
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
deleted file mode 100644
index bdce6f73..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import io.netty.handler.codec.http.HttpStatusClass
-import org.slf4j.LoggerFactory
-import reactor.core.publisher.Mono
-import reactor.netty.http.client.HttpClient
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-open class HttpAdapter(private val httpClient: HttpClient) {
-
- private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
- logger.debug("Nested exception:", it)
- }
-
- private fun createQueryString(params: Map<String, Any>): String {
- if (params.isEmpty())
- return ""
-
- val builder = StringBuilder("?")
- params.forEach { (key, value) ->
- builder
- .append(key)
- .append("=")
- .append(value)
- .append("&")
-
- }
-
- return builder.removeSuffix("&").toString()
- }
-
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
deleted file mode 100644
index 5f4bf354..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class LoggingSinkProvider : SinkProvider {
-
- override fun invoke(config: CollectorConfiguration): Sink {
- return object : Sink {
- private val totalMessages = AtomicLong()
- private val totalBytes = AtomicLong()
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
-
- private fun logMessage(msg: RoutedMessage) {
- val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
- val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
- if (msgs % INFO_LOGGING_FREQ == 0L)
- logger.info(logMessageSupplier)
- else
- logger.trace(logMessageSupplier)
- }
-
- }
- }
-
- companion object {
- const val INFO_LOGGING_FREQ = 100_000
- private val logger = Logger(LoggingSinkProvider::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
deleted file mode 100644
index a0c22418..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import reactor.core.publisher.Flux
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
- private val sentMessages = AtomicLong(0)
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
- .map { it.correlationMetadata() }
-
- return if (logger.traceEnabled) {
- result.doOnNext(::logSentMessage)
- } else {
- result
- }
- }
-
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
-
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
-
- companion object {
- val logger = Logger(KafkaSink::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
deleted file mode 100644
index 18191952..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
- }
-
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
deleted file mode 100644
index 4e9932cc..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import com.google.protobuf.MessageLite
-import org.apache.kafka.common.serialization.Serializer
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class ProtobufSerializer : Serializer<MessageLite> {
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- // no configuration
- }
-
- override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
- data?.toByteArray()
-
- override fun close() {
- // cleanup not needed
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
deleted file mode 100644
index 7a6ac7c8..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.apache.kafka.common.serialization.Serializer
-import org.onap.dcae.collectors.veshv.model.VesMessage
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class VesMessageSerializer : Serializer<VesMessage> {
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- }
-
- override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
-
- override fun close() {
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
deleted file mode 100644
index e535300a..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.socket
-
-import arrow.core.getOrElse
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
-import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
-import reactor.netty.Connection
-import reactor.netty.NettyInbound
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpServer
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
- private val sslContextFactory: ServerSslContextFactory,
- private val collectorProvider: CollectorProvider) : Server {
-
- override fun start(): IO<ServerHandle> = IO {
- val tcpServer = TcpServer.create()
- .addressSupplier { serverConfig.serverListenAddress }
- .configureSsl()
- .handle(this::handleConnection)
-
- NettyServerHandle(tcpServer.bindNow())
- }
-
- private fun TcpServer.configureSsl() =
- sslContextFactory
- .createSslContext(serverConfig.securityConfiguration)
- .map { sslContext ->
- this.secure { b -> b.sslContext(sslContext) }
- }.getOrElse { this }
-
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
- }
- )
-
- private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .receive()
- .retain()
-
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
- onReadIdle(timeout.toMillis()) {
- logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
- }
- disconnectClient()
- }
- return this
- }
-
- private fun Connection.disconnectClient() {
- channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
- else
- logger.warn("Channel close failed", it.cause())
- }
- }
-
- private fun Connection.logConnectionClosed(): Connection {
- onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
- }
- return this
- }
-
- companion object {
- private val logger = Logger(NettyTcpServer::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
deleted file mode 100644
index 4a2ef6b2..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import arrow.effects.IO
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
-import reactor.core.publisher.Flux
-import reactor.core.publisher.SynchronousSink
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class WireChunkDecoder(
- private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- private val streamBuffer = alloc.compositeBuffer()
-
- fun release() {
- streamBuffer.release()
- }
-
- fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
- logIncomingMessage(byteBuf)
- if (byteBuf.readableBytes() == 0) {
- byteBuf.release()
- Flux.empty()
- } else {
- streamBuffer.addComponent(true, byteBuf)
- generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
- .doFinally { streamBuffer.discardReadComponents() }
- }
- }
-
- private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
- decoder.decodeFirst(streamBuffer)
- .fold(onError(next), onSuccess(next))
- .unsafeRunSync()
- }
-
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
- when (err) {
- is InvalidWireFrame -> IO {
- next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
- logEndOfData()
- next.complete()
- }
- }
- }
-
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
- }
-
- private fun logIncomingMessage(wire: ByteBuf) {
- logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
- }
-
- private fun logDecodedWireMessage(wire: WireFrameMessage) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B" }
- }
-
- private fun logEndOfData() {
- logger.trace { "End of data in current TCP buffer" }
- }
-
- companion object {
- val logger = Logger(WireChunkDecoder::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
deleted file mode 100644
index 83a7cd85..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class WireFrameException(error: WireFrameDecodingError)
- : Exception("${error::class.simpleName}: ${error.message}")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
deleted file mode 100644
index ec546c7d..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
deleted file mode 100644
index 9de34498..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import java.time.Duration
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-data class ConfigurationProviderParams(val configurationUrl: String,
- val firstRequestDelay: Duration,
- val requestInterval: Duration)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
deleted file mode 100644
index 782877e3..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
deleted file mode 100644
index 85117684..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import java.net.InetSocketAddress
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class ServerConfiguration(
- val serverListenAddress: InetSocketAddress,
- val configurationProviderParams: ConfigurationProviderParams,
- val securityConfiguration: SecurityConfiguration,
- val idleTimeout: Duration,
- val healthCheckApiListenAddress: InetSocketAddress,
- val maximumPayloadSizeBytes: Int,
- val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
deleted file mode 100644
index f5bfcce1..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
- fun isValid(): Boolean = MessageValidator.isValid(this)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
deleted file mode 100644
index bab95c57..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import arrow.core.Option
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-data class Routing(val routes: List<Route>) {
-
- fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
-
- fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
- operator fun invoke(message: VesMessage): RoutedMessage =
- RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-Configuration DSL
- */
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
- val conf = RoutingBuilder()
- conf.init()
- return conf
-}
-
-class RoutingBuilder {
- private val routes: MutableList<RouteBuilder> = mutableListOf()
-
- fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
- val rule = RouteBuilder()
- rule.init()
- routes.add(rule)
- return rule
- }
-
- fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
- private lateinit var domain: String
- private lateinit var targetTopic: String
- private lateinit var partitioning: (CommonEventHeader) -> Int
-
- fun fromDomain(domain: String) {
- this.domain = domain
- }
-
- fun toTopic(targetTopic: String) {
- this.targetTopic = targetTopic
- }
-
- fun withFixedPartitioning(num: Int = 0) {
- partitioning = { num }
- }
-
- fun build() = Route(domain, targetTopic, partitioning)
-
-}