aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-core/src
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-28 15:46:50 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-11-29 14:41:42 +0100
commitdde383a2aa75f94c26d7949665b79cc95486a223 (patch)
tree75f3e8f564067afd0e67dbe6254183e45ca26944 /hv-collector-core/src
parent77f896523f2065b1da1be21545155a29edea5122 (diff)
Custom detekt rule for logger usage check
Check if logger invocations don't use unoptimal invocations, eg. concatenation `debug("a=" + a)` instead of lambda use `debug {"a=" + a}` Unfortunately to avoid defining dependencies in many places and having circural dependencies it was necessarry to reorganize the maven module structure. The goal was to have `sources` module with production code and `build` module with build-time tooling (detekt rules among them). Issue-ID: DCAEGEN2-1002 Change-Id: I36e677b98972aaae6905d722597cbce5e863d201 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-core/src')
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt50
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt38
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt80
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt35
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt38
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt39
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt88
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt122
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt68
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt62
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt82
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt45
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt40
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt114
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt101
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt29
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt26
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt30
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt22
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt37
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt32
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt83
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt128
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt112
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt74
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt157
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt86
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt234
-rw-r--r--hv-collector-core/src/test/resources/logback-test.xml35
32 files changed, 0 insertions, 2194 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
deleted file mode 100644
index dd0111bc..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.boundary
-
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import reactor.core.publisher.Flux
-
-interface Sink {
- fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage>
-}
-
-interface Metrics {
- fun notifyBytesReceived(size: Int)
- fun notifyMessageReceived(size: Int)
- fun notifyMessageSent(topic: String)
-}
-
-@FunctionalInterface
-interface SinkProvider {
- operator fun invoke(config: CollectorConfiguration): Sink
-
- companion object {
- fun just(sink: Sink): SinkProvider =
- object : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink = sink
- }
- }
-}
-
-interface ConfigurationProvider {
- operator fun invoke(): Flux<CollectorConfiguration>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
deleted file mode 100644
index 3c85a9b1..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.boundary
-
-import arrow.core.Option
-import arrow.effects.IO
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-interface Collector {
- fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void>
-}
-
-typealias CollectorProvider = () -> Option<Collector>
-
-interface Server {
- fun start(): IO<ServerHandle>
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
deleted file mode 100644
index 5c96e1c5..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.factory
-
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.impl.Router
-import org.onap.dcae.collectors.veshv.impl.VesDecoder
-import org.onap.dcae.collectors.veshv.impl.VesHvCollector
-import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-class CollectorFactory(val configuration: ConfigurationProvider,
- private val sinkProvider: SinkProvider,
- private val metrics: Metrics,
- private val maximumPayloadSizeBytes: Int,
- private val healthState: HealthState = HealthState.INSTANCE) {
-
- fun createVesHvCollectorProvider(): CollectorProvider {
- val collector: AtomicReference<Collector> = AtomicReference()
- configuration()
- .map(this::createVesHvCollector)
- .doOnNext {
- logger.info("Using updated configuration for new connections")
- healthState.changeState(HealthDescription.HEALTHY)
- }
- .doOnError {
- logger.error("Failed to acquire configuration from consul")
- healthState.changeState(HealthDescription.CONSUL_CONFIGURATION_NOT_FOUND)
- }
- .subscribe(collector::set)
- return collector::getOption
- }
-
- private fun createVesHvCollector(config: CollectorConfiguration): Collector {
- return VesHvCollector(
- wireChunkDecoderSupplier = { alloc ->
- WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
- },
- protobufDecoder = VesDecoder(),
- router = Router(config.routing),
- sink = sinkProvider(config),
- metrics = metrics)
- }
-
- companion object {
- private val logger = Logger(CollectorFactory::class)
- }
-}
-
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
deleted file mode 100644
index dce933ab..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.factory
-
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object ServerFactory {
- fun createNettyTcpServer(serverConfiguration: ServerConfiguration, collectorProvider: CollectorProvider): Server =
- NettyTcpServer(serverConfiguration, ServerSslContextFactory(), collectorProvider)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
deleted file mode 100644
index fb949079..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidator.kt
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import org.onap.dcae.collectors.veshv.domain.headerRequiredFieldDescriptors
-import org.onap.dcae.collectors.veshv.domain.vesEventListenerVersionRegex
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-internal object MessageValidator {
-
- fun isValid(message: VesMessage): Boolean {
- return allMandatoryFieldsArePresent(message.header)
- }
-
- private fun allMandatoryFieldsArePresent(header: CommonEventHeader) =
- headerRequiredFieldDescriptors
- .all { fieldDescriptor -> header.hasField(fieldDescriptor) }
- .and(vesEventListenerVersionRegex.matches(header.vesEventListenerVersion))
-
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
deleted file mode 100644
index cee658b6..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.Routing
-import org.onap.dcae.collectors.veshv.model.VesMessage
-
-class Router(private val routing: Routing) {
- fun findDestination(message: VesMessage): Option<RoutedMessage> =
- routing.routeFor(message.header).map { it(message) }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
deleted file mode 100644
index 1d43588f..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoder.kt
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Try
-import arrow.core.Option
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.VesEvent
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class VesDecoder {
-
- fun decode(bytes: ByteData): Option<VesMessage> =
- Try {
- val decodedHeader = VesEvent.parseFrom(bytes.unsafeAsArray()).commonEventHeader
- VesMessage(decodedHeader, bytes)
- }.toOption()
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
deleted file mode 100644
index b700f135..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Option
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.boundary.Collector
-import org.onap.dcae.collectors.veshv.boundary.Metrics
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class VesHvCollector(
- private val wireChunkDecoderSupplier: (ByteBufAllocator) -> WireChunkDecoder,
- private val protobufDecoder: VesDecoder,
- private val router: Router,
- private val sink: Sink,
- private val metrics: Metrics) : Collector {
-
- override fun handleConnection(alloc: ByteBufAllocator, dataStream: Flux<ByteBuf>): Mono<Void> =
- wireChunkDecoderSupplier(alloc).let { wireDecoder ->
- dataStream
- .transform { decodeWireFrame(it, wireDecoder) }
- .filter(WireFrameMessage::isValid)
- .transform(::decodePayload)
- .filter(VesMessage::isValid)
- .transform(::routeMessage)
- .onErrorResume { logger.handleReactiveStreamError(it) }
- .doFinally { releaseBuffersMemory(wireDecoder) }
- .then()
- }
-
- private fun decodeWireFrame(flux: Flux<ByteBuf>, decoder: WireChunkDecoder): Flux<WireFrameMessage> = flux
- .doOnNext { metrics.notifyBytesReceived(it.readableBytes()) }
- .concatMap(decoder::decode)
- .doOnNext { metrics.notifyMessageReceived(it.payloadSize) }
-
- private fun decodePayload(flux: Flux<WireFrameMessage>): Flux<VesMessage> = flux
- .map(WireFrameMessage::payload)
- .map(protobufDecoder::decode)
- .flatMap { omitWhenNone(it) }
-
- private fun routeMessage(flux: Flux<VesMessage>): Flux<RoutedMessage> = flux
- .flatMap(this::findRoute)
- .compose(sink::send)
- .doOnNext { metrics.notifyMessageSent(it.topic) }
-
-
- private fun findRoute(msg: VesMessage): Mono<RoutedMessage> = omitWhenNone((router::findDestination)(msg))
-
- private fun <V> omitWhenNone(it: Option<V>): Mono<V> = it.fold(
- { Mono.empty() },
- { Mono.just(it) })
-
- private fun releaseBuffersMemory(wireChunkDecoder: WireChunkDecoder) = wireChunkDecoder.release()
-
- companion object {
- private val logger = Logger(VesHvCollector::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
deleted file mode 100644
index 8c16736d..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import reactor.netty.http.client.HttpClient
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object AdapterFactory {
- fun kafkaSink(): SinkProvider = KafkaSinkProvider()
- fun loggingSink(): SinkProvider = LoggingSinkProvider()
-
- fun consulConfigurationProvider(configurationProviderParams: ConfigurationProviderParams): ConfigurationProvider =
- ConsulConfigurationProvider(httpAdapter(), configurationProviderParams)
-
- private fun httpAdapter(): HttpAdapter = HttpAdapter(HttpClient.create())
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
deleted file mode 100644
index ec7c60c0..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProvider.kt
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.io.StringReader
-import java.time.Duration
-import java.util.concurrent.atomic.AtomicReference
-import javax.json.Json
-import javax.json.JsonObject
-
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConsulConfigurationProvider(private val http: HttpAdapter,
- private val url: String,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
-
- private val lastConfigurationHash: AtomicReference<Int> = AtomicReference(0)
- private val retry = retrySpec
- .doOnRetry {
- logger.warn("Could not get fresh configuration", it.exception())
- healthState.changeState(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- }
-
- constructor(http: HttpAdapter,
- params: ConfigurationProviderParams) : this(
- http,
- params.configurationUrl,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval.dividedBy(BACKOFF_INTERVAL_FACTOR))
- .jitter(Jitter.random())
- )
-
- override fun invoke(): Flux<CollectorConfiguration> =
- Flux.interval(firstRequestDelay, requestInterval)
- .concatMap { askForConfig() }
- .flatMap(::filterDifferentValues)
- .map(::parseJsonResponse)
- .map(::createCollectorConfiguration)
- .retryWhen(retry)
-
- private fun askForConfig(): Mono<String> = http.get(url)
-
- private fun filterDifferentValues(configurationString: String) =
- hashOf(configurationString).let {
- if (it == lastConfigurationHash.get()) {
- Mono.empty()
- } else {
- lastConfigurationHash.set(it)
- Mono.just(configurationString)
- }
- }
-
- private fun hashOf(str: String) = str.hashCode()
-
- private fun parseJsonResponse(responseString: String): JsonObject =
- Json.createReader(StringReader(responseString)).readObject()
-
- private fun createCollectorConfiguration(configuration: JsonObject): CollectorConfiguration {
- logger.info { "Obtained new configuration from consul:\n${configuration}" }
- val routing = configuration.getJsonArray("collector.routing")
-
- return CollectorConfiguration(
- kafkaBootstrapServers = configuration.getString("dmaap.kafkaBootstrapServers"),
- routing = org.onap.dcae.collectors.veshv.model.routing {
- for (route in routing) {
- val routeObj = route.asJsonObject()
- defineRoute {
- fromDomain(routeObj.getString("fromDomain"))
- toTopic(routeObj.getString("toTopic"))
- withFixedPartitioning()
- }
- }
- }.build()
- )
- }
-
- companion object {
- private const val MAX_RETRIES = 5L
- private const val BACKOFF_INTERVAL_FACTOR = 30L
- private val logger = Logger(ConsulConfigurationProvider::class)
- }
-}
-
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
deleted file mode 100644
index bdce6f73..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import io.netty.handler.codec.http.HttpStatusClass
-import org.slf4j.LoggerFactory
-import reactor.core.publisher.Mono
-import reactor.netty.http.client.HttpClient
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-open class HttpAdapter(private val httpClient: HttpClient) {
-
- private val logger = LoggerFactory.getLogger(HttpAdapter::class.java)
-
- open fun get(url: String, queryParams: Map<String, Any> = emptyMap()): Mono<String> = httpClient
- .get()
- .uri(url + createQueryString(queryParams))
- .responseSingle { response, content ->
- if (response.status().codeClass() == HttpStatusClass.SUCCESS)
- content.asString()
- else {
- val errorMessage = "$url ${response.status().code()} ${response.status().reasonPhrase()}"
- Mono.error(IllegalStateException(errorMessage))
- }
- }
- .doOnError {
- logger.error("Failed to get resource on path: $url (${it.localizedMessage})")
- logger.debug("Nested exception:", it)
- }
-
- private fun createQueryString(params: Map<String, Any>): String {
- if (params.isEmpty())
- return ""
-
- val builder = StringBuilder("?")
- params.forEach { (key, value) ->
- builder
- .append(key)
- .append("=")
- .append(value)
- .append("&")
-
- }
-
- return builder.removeSuffix("&").toString()
- }
-
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
deleted file mode 100644
index 5f4bf354..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/LoggingSinkProvider.kt
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Flux
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class LoggingSinkProvider : SinkProvider {
-
- override fun invoke(config: CollectorConfiguration): Sink {
- return object : Sink {
- private val totalMessages = AtomicLong()
- private val totalBytes = AtomicLong()
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> =
- messages
- .doOnNext(this::logMessage)
-
- private fun logMessage(msg: RoutedMessage) {
- val msgs = totalMessages.addAndGet(1)
- val bytes = totalBytes.addAndGet(msg.message.rawMessage.size().toLong())
- val logMessageSupplier = { "Message routed to ${msg.topic}. Total = $msgs ($bytes B)" }
- if (msgs % INFO_LOGGING_FREQ == 0L)
- logger.info(logMessageSupplier)
- else
- logger.trace(logMessageSupplier)
- }
-
- }
- }
-
- companion object {
- const val INFO_LOGGING_FREQ = 100_000
- private val logger = Logger(LoggingSinkProvider::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
deleted file mode 100644
index a0c22418..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSink.kt
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import reactor.core.publisher.Flux
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderRecord
-import reactor.kafka.sender.SenderResult
-import java.util.concurrent.atomic.AtomicLong
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class KafkaSink(private val sender: KafkaSender<CommonEventHeader, VesMessage>) : Sink {
- private val sentMessages = AtomicLong(0)
-
- override fun send(messages: Flux<RoutedMessage>): Flux<RoutedMessage> {
- val records = messages.map(this::vesToKafkaRecord)
- val result = sender.send(records)
- .doOnNext(::logException)
- .filter(::isSuccessful)
- .map { it.correlationMetadata() }
-
- return if (logger.traceEnabled) {
- result.doOnNext(::logSentMessage)
- } else {
- result
- }
- }
-
- private fun vesToKafkaRecord(msg: RoutedMessage): SenderRecord<CommonEventHeader, VesMessage, RoutedMessage> {
- return SenderRecord.create(
- msg.topic,
- msg.partition,
- System.currentTimeMillis(),
- msg.message.header,
- msg.message,
- msg)
- }
-
- private fun logException(senderResult: SenderResult<out Any>) {
- if (senderResult.exception() != null) {
- logger.warn(senderResult.exception()) { "Failed to send message to Kafka" }
- }
- }
-
- private fun logSentMessage(sentMsg: RoutedMessage) {
- logger.trace {
- val msgNum = sentMessages.incrementAndGet()
- "Message #$msgNum has been sent to ${sentMsg.topic}:${sentMsg.partition}"
- }
- }
-
- private fun isSuccessful(senderResult: SenderResult<out Any>) = senderResult.exception() == null
-
- companion object {
- val logger = Logger(KafkaSink::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
deleted file mode 100644
index 18191952..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkProvider.kt
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.onap.dcae.collectors.veshv.boundary.Sink
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-import reactor.kafka.sender.KafkaSender
-import reactor.kafka.sender.SenderOptions
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-internal class KafkaSinkProvider : SinkProvider {
- override fun invoke(config: CollectorConfiguration): Sink {
- return KafkaSink(KafkaSender.create(constructSenderOptions(config)))
- }
-
- private fun constructSenderOptions(config: CollectorConfiguration) =
- SenderOptions.create<CommonEventHeader, VesMessage>()
- .producerProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.kafkaBootstrapServers)
- .producerProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ProtobufSerializer::class.java)
- .producerProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VesMessageSerializer::class.java)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
deleted file mode 100644
index 4e9932cc..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/ProtobufSerializer.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import com.google.protobuf.MessageLite
-import org.apache.kafka.common.serialization.Serializer
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class ProtobufSerializer : Serializer<MessageLite> {
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- // no configuration
- }
-
- override fun serialize(topic: String?, data: MessageLite?): ByteArray? =
- data?.toByteArray()
-
- override fun close() {
- // cleanup not needed
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
deleted file mode 100644
index 7a6ac7c8..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/VesMessageSerializer.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters.kafka
-
-import org.apache.kafka.common.serialization.Serializer
-import org.onap.dcae.collectors.veshv.model.VesMessage
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class VesMessageSerializer : Serializer<VesMessage> {
- override fun configure(configs: MutableMap<String, *>?, isKey: Boolean) {
- }
-
- override fun serialize(topic: String?, msg: VesMessage?): ByteArray? = msg?.rawMessage?.unsafeAsArray()
-
- override fun close() {
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
deleted file mode 100644
index e535300a..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.socket
-
-import arrow.core.getOrElse
-import arrow.effects.IO
-import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.Server
-import org.onap.dcae.collectors.veshv.model.ServerConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.ServerSslContextFactory
-import org.onap.dcae.collectors.veshv.utils.NettyServerHandle
-import org.onap.dcae.collectors.veshv.utils.ServerHandle
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import reactor.core.publisher.Mono
-import reactor.netty.ByteBufFlux
-import reactor.netty.Connection
-import reactor.netty.NettyInbound
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpServer
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class NettyTcpServer(private val serverConfig: ServerConfiguration,
- private val sslContextFactory: ServerSslContextFactory,
- private val collectorProvider: CollectorProvider) : Server {
-
- override fun start(): IO<ServerHandle> = IO {
- val tcpServer = TcpServer.create()
- .addressSupplier { serverConfig.serverListenAddress }
- .configureSsl()
- .handle(this::handleConnection)
-
- NettyServerHandle(tcpServer.bindNow())
- }
-
- private fun TcpServer.configureSsl() =
- sslContextFactory
- .createSslContext(serverConfig.securityConfiguration)
- .map { sslContext ->
- this.secure { b -> b.sslContext(sslContext) }
- }.getOrElse { this }
-
- private fun handleConnection(nettyInbound: NettyInbound, nettyOutbound: NettyOutbound): Mono<Void> =
- collectorProvider().fold(
- {
- nettyInbound.withConnection { conn ->
- logger.warn { "Collector not ready. Closing connection from ${conn.address()}..." }
- }
- Mono.empty()
- },
- {
- nettyInbound.withConnection { conn ->
- logger.info { "Handling connection from ${conn.address()}" }
- conn.configureIdleTimeout(serverConfig.idleTimeout)
- .logConnectionClosed()
- }
- it.handleConnection(nettyOutbound.alloc(), createDataStream(nettyInbound))
- }
- )
-
- private fun createDataStream(nettyInbound: NettyInbound): ByteBufFlux = nettyInbound
- .receive()
- .retain()
-
- private fun Connection.configureIdleTimeout(timeout: Duration): Connection {
- onReadIdle(timeout.toMillis()) {
- logger.info {
- "Idle timeout of ${timeout.seconds} s reached. Closing connection from ${address()}..."
- }
- disconnectClient()
- }
- return this
- }
-
- private fun Connection.disconnectClient() {
- channel().close().addListener {
- if (it.isSuccess)
- logger.debug { "Channel (${address()}) closed successfully." }
- else
- logger.warn("Channel close failed", it.cause())
- }
- }
-
- private fun Connection.logConnectionClosed(): Connection {
- onTerminate().subscribe {
- logger.info("Connection from ${address()} has been closed")
- }
- return this
- }
-
- companion object {
- private val logger = Logger(NettyTcpServer::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
deleted file mode 100644
index 4a2ef6b2..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import arrow.effects.IO
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.InvalidWireFrame
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
-import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError
-import reactor.core.publisher.Flux
-import reactor.core.publisher.SynchronousSink
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-internal class WireChunkDecoder(
- private val decoder: WireFrameDecoder,
- alloc: ByteBufAllocator = ByteBufAllocator.DEFAULT) {
- private val streamBuffer = alloc.compositeBuffer()
-
- fun release() {
- streamBuffer.release()
- }
-
- fun decode(byteBuf: ByteBuf): Flux<WireFrameMessage> = Flux.defer {
- logIncomingMessage(byteBuf)
- if (byteBuf.readableBytes() == 0) {
- byteBuf.release()
- Flux.empty()
- } else {
- streamBuffer.addComponent(true, byteBuf)
- generateFrames()
- .onErrorResume { logger.handleReactiveStreamError(it, Flux.error(it)) }
- .doFinally { streamBuffer.discardReadComponents() }
- }
- }
-
- private fun generateFrames(): Flux<WireFrameMessage> = Flux.generate { next ->
- decoder.decodeFirst(streamBuffer)
- .fold(onError(next), onSuccess(next))
- .unsafeRunSync()
- }
-
- private fun onError(next: SynchronousSink<WireFrameMessage>): (WireFrameDecodingError) -> IO<Unit> = { err ->
- when (err) {
- is InvalidWireFrame -> IO {
- next.error(WireFrameException(err))
- }
- is MissingWireFrameBytes -> IO {
- logEndOfData()
- next.complete()
- }
- }
- }
-
- private fun onSuccess(next: SynchronousSink<WireFrameMessage>): (WireFrameMessage) -> IO<Unit> = { frame ->
- IO {
- logDecodedWireMessage(frame)
- next.next(frame)
- }
- }
-
- private fun logIncomingMessage(wire: ByteBuf) {
- logger.trace { "Got message with total size of ${wire.readableBytes()} B" }
- }
-
- private fun logDecodedWireMessage(wire: WireFrameMessage) {
- logger.trace { "Wire payload size: ${wire.payloadSize} B" }
- }
-
- private fun logEndOfData() {
- logger.trace { "End of data in current TCP buffer" }
- }
-
- companion object {
- val logger = Logger(WireChunkDecoder::class)
- }
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
deleted file mode 100644
index 83a7cd85..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireFrameException.kt
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since June 2018
- */
-class WireFrameException(error: WireFrameDecodingError)
- : Exception("${error::class.simpleName}: ${error.message}")
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
deleted file mode 100644
index ec546c7d..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/CollectorConfiguration.kt
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class CollectorConfiguration(val kafkaBootstrapServers: String, val routing: Routing)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
deleted file mode 100644
index 9de34498..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ConfigurationProviderParams.kt
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import java.time.Duration
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since July 2018
- */
-data class ConfigurationProviderParams(val configurationUrl: String,
- val firstRequestDelay: Duration,
- val requestInterval: Duration)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
deleted file mode 100644
index 782877e3..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/RoutedMessage.kt
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-data class RoutedMessage(val topic: String, val partition: Int, val message: VesMessage)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
deleted file mode 100644
index 85117684..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import java.net.InetSocketAddress
-import java.time.Duration
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class ServerConfiguration(
- val serverListenAddress: InetSocketAddress,
- val configurationProviderParams: ConfigurationProviderParams,
- val securityConfiguration: SecurityConfiguration,
- val idleTimeout: Duration,
- val healthCheckApiListenAddress: InetSocketAddress,
- val maximumPayloadSizeBytes: Int,
- val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
deleted file mode 100644
index f5bfcce1..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/VesMessage.kt
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.impl.MessageValidator
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-data class VesMessage(val header: CommonEventHeader, val rawMessage: ByteData) {
- fun isValid(): Boolean = MessageValidator.isValid(this)
-}
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
deleted file mode 100644
index bab95c57..00000000
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/routing.kt
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.model
-
-import arrow.core.Option
-import org.onap.ves.VesEventOuterClass.CommonEventHeader
-
-data class Routing(val routes: List<Route>) {
-
- fun routeFor(commonHeader: CommonEventHeader): Option<Route> =
- Option.fromNullable(routes.find { it.applies(commonHeader) })
-}
-
-data class Route(val domain: String, val targetTopic: String, val partitioning: (CommonEventHeader) -> Int) {
-
- fun applies(commonHeader: CommonEventHeader) = commonHeader.domain == domain
-
- operator fun invoke(message: VesMessage): RoutedMessage =
- RoutedMessage(targetTopic, partitioning(message.header), message)
-}
-
-
-/*
-Configuration DSL
- */
-
-fun routing(init: RoutingBuilder.() -> Unit): RoutingBuilder {
- val conf = RoutingBuilder()
- conf.init()
- return conf
-}
-
-class RoutingBuilder {
- private val routes: MutableList<RouteBuilder> = mutableListOf()
-
- fun defineRoute(init: RouteBuilder.() -> Unit): RouteBuilder {
- val rule = RouteBuilder()
- rule.init()
- routes.add(rule)
- return rule
- }
-
- fun build() = Routing(routes.map { it.build() }.toList())
-}
-
-class RouteBuilder {
-
- private lateinit var domain: String
- private lateinit var targetTopic: String
- private lateinit var partitioning: (CommonEventHeader) -> Int
-
- fun fromDomain(domain: String) {
- this.domain = domain
- }
-
- fun toTopic(targetTopic: String) {
- this.targetTopic = targetTopic
- }
-
- fun withFixedPartitioning(num: Int = 0) {
- partitioning = { num }
- }
-
- fun build() = Route(domain, targetTopic, partitioning)
-
-}
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
deleted file mode 100644
index 3090042d..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/MessageValidatorTest.kt
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import org.onap.ves.VesEventOuterClass.CommonEventHeader.*
-
-internal object MessageValidatorTest : Spek({
-
- given("Message validator") {
- val cut = MessageValidator
-
- on("ves hv message including header with fully initialized fields") {
- val commonHeader = commonHeader()
-
- it("should accept message with fully initialized message header") {
- val vesMessage = VesMessage(commonHeader, vesEventBytes(commonHeader))
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isTrue()
- }
-
- VesEventDomain.values()
- .forEach { domain ->
- it("should accept message with $domain domain") {
- val header = commonHeader(domain)
- val vesMessage = VesMessage(header, vesEventBytes(header))
- assertThat(cut.isValid(vesMessage))
- .isTrue()
- }
- }
- }
-
- on("ves hv message bytes") {
- val vesMessage = VesMessage(getDefaultInstance(), ByteData.EMPTY)
- it("should not accept message with default header") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
- }
- }
-
- val priorityTestCases = mapOf(
- Priority.PRIORITY_NOT_PROVIDED to false,
- Priority.HIGH to true
- )
-
- priorityTestCases.forEach { value, expectedResult ->
- on("ves hv message including header with priority $value") {
- val commonEventHeader = commonHeader(priority = value)
- val vesMessage = VesMessage(commonEventHeader, vesEventBytes(commonEventHeader))
-
- it("should resolve validation result") {
- assertThat(cut.isValid(vesMessage)).describedAs("message validation results")
- .isEqualTo(expectedResult)
- }
- }
- }
-
- on("ves hv message including header with not initialized fields") {
- val commonHeader = newBuilder()
- .setVersion("1.9")
- .setEventName("Sample event name")
- .setEventId("Sample event Id")
- .setSourceName("Sample Source")
- .build()
- val rawMessageBytes = vesEventBytes(commonHeader)
-
- it("should not accept not fully initialized message header") {
- val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
- }
- }
-
- on("ves hv message including header.vesEventListenerVersion with non-string major part") {
- val commonHeader = commonHeader(vesEventListenerVersion = "sample-version")
- val rawMessageBytes = vesEventBytes(commonHeader)
-
-
- it("should not accept message header") {
- val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
- }
- }
-
- on("ves hv message including header.vesEventListenerVersion with major part != 7") {
- val commonHeader = commonHeader(vesEventListenerVersion = "1.2.3")
- val rawMessageBytes = vesEventBytes(commonHeader)
-
- it("should not accept message header") {
- val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
- }
- }
-
- on("ves hv message including header.vesEventListenerVersion with minor part not starting with a digit") {
- val commonHeader = commonHeader(vesEventListenerVersion = "7.test")
- val rawMessageBytes = vesEventBytes(commonHeader)
-
- it("should not accept message header") {
- val vesMessage = VesMessage(commonHeader, rawMessageBytes)
- assertThat(cut.isValid(vesMessage)).describedAs("message validation result").isFalse()
- }
- }
- }
-})
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
deleted file mode 100644
index e8a31231..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.None
-import arrow.core.Some
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
-import org.onap.dcae.collectors.veshv.model.RoutedMessage
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.model.routing
-import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object RouterTest : Spek({
- given("sample configuration") {
- val config = routing {
-
- defineRoute {
- fromDomain(PERF3GPP.domainName)
- toTopic("ves_rtpm")
- withFixedPartitioning(2)
- }
-
- defineRoute {
- fromDomain(SYSLOG.domainName)
- toTopic("ves_trace")
- withFixedPartitioning()
- }
- }.build()
- val cut = Router(config)
-
- on("message with existing route (rtpm)") {
- val message = VesMessage(commonHeader(PERF3GPP), ByteData.EMPTY)
- val result = cut.findDestination(message)
-
- it("should have route available") {
- assertThat(result).isNotNull()
- }
-
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(2))
- }
-
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_rtpm"))
- }
-
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
- }
- }
-
- on("message with existing route (trace)") {
- val message = VesMessage(commonHeader(SYSLOG), ByteData.EMPTY)
- val result = cut.findDestination(message)
-
- it("should have route available") {
- assertThat(result).isNotNull()
- }
-
- it("should be routed to proper partition") {
- assertThat(result.map(RoutedMessage::partition)).isEqualTo(Some(0))
- }
-
- it("should be routed to proper topic") {
- assertThat(result.map(RoutedMessage::topic)).isEqualTo(Some("ves_trace"))
- }
-
- it("should be routed with a given message") {
- assertThat(result.map(RoutedMessage::message)).isEqualTo(Some(message))
- }
- }
-
- on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), ByteData.EMPTY)
- val result = cut.findDestination(message)
-
- it("should not have route available") {
- assertThat(result).isEqualTo(None)
- }
- }
- }
-}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
deleted file mode 100644
index 8950a557..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/VesDecoderTest.kt
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl
-
-import arrow.core.Option
-import com.google.protobuf.ByteString
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.ByteData
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.model.VesMessage
-import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
-import org.onap.dcae.collectors.veshv.tests.utils.vesEventBytes
-import java.nio.charset.Charset
-import kotlin.test.assertTrue
-import kotlin.test.fail
-
-
-internal object VesDecoderTest : Spek({
-
- given("ves message decoder") {
- val cut = VesDecoder()
-
- on("ves hv message bytes") {
- val commonHeader = commonHeader(HEARTBEAT)
- val rawMessageBytes = vesEventBytes(commonHeader, ByteString.copyFromUtf8("highvolume measurements"))
-
- it("should decode only header and pass it on along with raw message") {
- val expectedMessage = VesMessage(
- commonHeader,
- rawMessageBytes
- )
-
- assertTrue {
- cut.decode(rawMessageBytes).exists {
- it == expectedMessage
- }
- }
- }
- }
-
- on("invalid ves hv message bytes") {
- val rawMessageBytes = ByteData("ala ma kota".toByteArray(Charset.defaultCharset()))
-
- it("should throw error") {
- assertFailedWithError(cut.decode(rawMessageBytes))
- }
- }
- }
-})
-
-private fun <A> assertFailedWithError(option: Option<A>) =
- option.exists {
- fail("Error expected")
- }
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
deleted file mode 100644
index c6364f74..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConsulConfigurationProviderTest.kt
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import com.nhaarman.mockitokotlin2.eq
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.mockito.Mockito
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-
-import reactor.core.publisher.Mono
-import reactor.retry.Retry
-import reactor.test.StepVerifier
-import java.time.Duration
-import java.util.*
-import kotlin.test.assertEquals
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal object ConsulConfigurationProviderTest : Spek({
-
- describe("Consul configuration provider") {
-
- val httpAdapterMock: HttpAdapter = mock()
- val healthStateProvider = HealthState.INSTANCE
-
- given("valid resource url") {
- val validUrl = "http://valid-url/"
- val consulConfigProvider = constructConsulConfigProvider(validUrl, httpAdapterMock, healthStateProvider)
-
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(validUrl), Mockito.anyMap()))
- .thenReturn(Mono.just(constructConsulResponse()))
-
- it("should use received configuration") {
-
- StepVerifier.create(consulConfigProvider().take(1))
- .consumeNextWith {
-
- assertEquals("$kafkaAddress:9093", it.kafkaBootstrapServers)
-
- val route1 = it.routing.routes[0]
- assertThat(FAULT.domainName)
- .describedAs("routed domain 1")
- .isEqualTo(route1.domain)
- assertThat("test-topic-1")
- .describedAs("target topic 1")
- .isEqualTo(route1.targetTopic)
-
- val route2 = it.routing.routes[1]
- assertThat(HEARTBEAT.domainName)
- .describedAs("routed domain 2")
- .isEqualTo(route2.domain)
- assertThat("test-topic-2")
- .describedAs("target topic 2")
- .isEqualTo(route2.targetTopic)
-
- }.verifyComplete()
- }
- }
-
- }
- given("invalid resource url") {
- val invalidUrl = "http://invalid-url/"
-
- val iterationCount = 3L
- val consulConfigProvider = constructConsulConfigProvider(
- invalidUrl, httpAdapterMock, healthStateProvider, iterationCount
- )
-
- on("call to consul") {
- whenever(httpAdapterMock.get(eq(invalidUrl), Mockito.anyMap()))
- .thenReturn(Mono.error(RuntimeException("Test exception")))
-
- it("should interrupt the flux") {
-
- StepVerifier.create(consulConfigProvider())
- .verifyErrorMessage("Test exception")
- }
-
- it("should update the health state") {
- StepVerifier.create(healthStateProvider().take(iterationCount))
- .expectNextCount(iterationCount - 1)
- .expectNext(HealthDescription.RETRYING_FOR_CONSUL_CONFIGURATION)
- .verifyComplete()
- }
- }
- }
- }
-
-})
-
-private fun constructConsulConfigProvider(url: String,
- httpAdapter: HttpAdapter,
- healthState: HealthState,
- iterationCount: Long = 1
-): ConsulConfigurationProvider {
-
- val firstRequestDelay = Duration.ofMillis(1)
- val requestInterval = Duration.ofMillis(1)
- val retry = Retry.onlyIf<Any> { it.iteration() <= iterationCount }.fixedBackoff(Duration.ofNanos(1))
-
- return ConsulConfigurationProvider(
- httpAdapter,
- url,
- firstRequestDelay,
- requestInterval,
- healthState,
- retry
- )
-}
-
-
-const val kafkaAddress = "message-router-kafka"
-
-fun constructConsulResponse(): String =
- """{
- "dmaap.kafkaBootstrapServers": "$kafkaAddress:9093",
- "collector.routing": [
- {
- "fromDomain": "fault",
- "toTopic": "test-topic-1"
- },
- {
- "fromDomain": "heartbeat",
- "toTopic": "test-topic-2"
- }
- ]
- }"""
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
deleted file mode 100644
index 91457faf..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapterTest.kt
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import reactor.core.publisher.Mono
-import reactor.netty.http.client.HttpClient
-import reactor.netty.http.server.HttpServer
-import reactor.test.StepVerifier
-import reactor.test.test
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal object HttpAdapterTest : Spek({
- describe("HttpAdapter") {
-
- val httpServer = HttpServer.create()
- .host("127.0.0.1")
- .route { routes ->
- routes.get("/url") { req, resp ->
- resp.sendString(Mono.just(req.uri()))
- }
- }
- .bindNow()
- val baseUrl = "http://${httpServer.host()}:${httpServer.port()}"
- val httpAdapter = HttpAdapter(HttpClient.create().baseUrl(baseUrl))
-
- afterGroup {
- httpServer.disposeNow()
- }
-
- given("url without query params") {
- val url = "/url"
-
- it("should not append query string") {
- httpAdapter.get(url).test()
- .expectNext(url)
- .verifyComplete()
- }
- }
-
- given("url with query params") {
- val queryParams = mapOf(Pair("p", "the-value"))
- val url = "/url"
-
- it("should add them as query string to the url") {
- httpAdapter.get(url, queryParams).test()
- .expectNext("/url?p=the-value")
- .verifyComplete()
- }
- }
-
- given("invalid url") {
- val invalidUrl = "/wtf"
-
- it("should interrupt the flux") {
- StepVerifier
- .create(httpAdapter.get(invalidUrl))
- .verifyError()
- }
- }
- }
-
-}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
deleted file mode 100644
index f06a0dc7..00000000
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.wire
-
-import io.netty.buffer.ByteBuf
-import io.netty.buffer.Unpooled
-import io.netty.buffer.UnpooledByteBufAllocator
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
-import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import reactor.test.test
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk></piotr.jaszczyk>@nokia.com>
- * @since May 2018
- */
-internal object WireChunkDecoderTest : Spek({
- val alloc = UnpooledByteBufAllocator.DEFAULT
- val samplePayload = "konstantynopolitanczykowianeczka".toByteArray()
- val anotherPayload = "ala ma kota a kot ma ale".toByteArray()
-
- val encoder = WireFrameEncoder(alloc)
-
- fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
-
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
-
- fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
- for (bb in byteBuffers) {
- assertThat(bb.refCnt())
- .describedAs("should be released: $bb ref count")
- .isEqualTo(0)
- }
- }
-
- fun verifyMemoryNotReleased(vararg byteBuffers: ByteBuf) {
- for (bb in byteBuffers) {
- assertThat(bb.refCnt())
- .describedAs("should not be released: $bb ref count")
- .isEqualTo(1)
- }
- }
-
- describe("decoding wire protocol") {
- given("empty input") {
- val input = Unpooled.EMPTY_BUFFER
-
- it("should yield empty result") {
- createInstance().decode(input).test().verifyComplete()
- }
- }
-
- given("input with no readable bytes") {
- val input = Unpooled.wrappedBuffer(byteArrayOf(0x00)).readerIndex(1)
-
- it("should yield empty result") {
- createInstance().decode(input).test().verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input)
- }
- }
-
- given("invalid input (not starting with marker)") {
- val input = Unpooled.wrappedBuffer(samplePayload)
-
- it("should yield error") {
- createInstance().decode(input).test()
- .verifyError(WireFrameException::class.java)
- }
-
- it("should leave memory unreleased") {
- verifyMemoryNotReleased(input)
- }
- }
-
- given("valid input") {
- val input = WireFrameMessage(samplePayload)
-
- it("should yield decoded input frame") {
- createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyComplete()
- }
- }
-
- given("valid input with part of next frame") {
- val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
- .writeBytes(encoder.encode(WireFrameMessage(samplePayload)).slice(0, 3))
-
- it("should yield decoded input frame") {
- createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyComplete()
- }
-
- it("should leave memory unreleased") {
- verifyMemoryNotReleased(input)
- }
- }
-
- given("valid input with garbage after it") {
- val input = Unpooled.buffer()
- .writeBytes(encoder.encode(WireFrameMessage(samplePayload)))
- .writeBytes(Unpooled.wrappedBuffer(samplePayload))
-
- it("should yield decoded input frame and error") {
- createInstance().decode(input).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyError(WireFrameException::class.java)
- }
-
- it("should leave memory unreleased") {
- verifyMemoryNotReleased(input)
- }
- }
-
- given("two inputs containing two separate messages") {
- val input1 = encoder.encode(WireFrameMessage(samplePayload))
- val input2 = encoder.encode(WireFrameMessage(anotherPayload))
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyComplete()
- cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
- .verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input1, input2)
- }
- }
-
- given("1st input containing 1st frame and 2nd input containing garbage") {
- val input1 = encoder.encode(WireFrameMessage(samplePayload))
- val input2 = Unpooled.wrappedBuffer(anotherPayload)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input1)
- .test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyComplete()
- cut.decode(input2).test()
- .verifyError(WireFrameException::class.java)
- }
-
- it("should release memory for 1st input") {
- verifyMemoryReleased(input1)
- }
-
- it("should leave memory unreleased for 2nd input") {
- verifyMemoryNotReleased(input2)
- }
- }
-
-
- given("1st input containing 1st frame + part of 2nd frame and 2nd input containing rest of 2nd frame") {
- val frame1 = encoder.encode(WireFrameMessage(samplePayload))
- val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
-
- val input1 = Unpooled.buffer()
- .writeBytes(frame1)
- .writeBytes(frame2, 3)
- val input2 = Unpooled.buffer().writeBytes(frame2)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input1).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .verifyComplete()
- cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == anotherPayload.size }
- .verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input1, input2)
- }
- }
-
- given("1st input containing part of 1st frame and 2nd input containing rest of 1st + 2nd frame") {
- val frame1 = encoder.encode(WireFrameMessage(samplePayload))
- val frame2 = encoder.encode(WireFrameMessage(anotherPayload))
-
- val input1 = Unpooled.buffer()
- .writeBytes(frame1, 5)
- val input2 = Unpooled.buffer()
- .writeBytes(frame1)
- .writeBytes(frame2)
-
- it("should yield decoded input frames") {
- val cut = createInstance()
- cut.decode(input1).test()
- .verifyComplete()
- cut.decode(input2).test()
- .expectNextMatches { it.payloadSize == samplePayload.size }
- .expectNextMatches { it.payloadSize == anotherPayload.size }
- .verifyComplete()
- }
-
- it("should release memory") {
- verifyMemoryReleased(input1, input2)
- }
- }
- }
-}) \ No newline at end of file
diff --git a/hv-collector-core/src/test/resources/logback-test.xml b/hv-collector-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 9a4eacfe..00000000
--- a/hv-collector-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,35 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<configuration>
- <property name="LOG_FILE"
- value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/>
- <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/>
-
- <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>
- %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n
- </pattern>
- </encoder>
- </appender>
-
- <appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder>
- <pattern>${FILE_LOG_PATTERN}</pattern>
- </encoder>
- <file>${LOG_FILE}</file>
- <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern>
- <maxFileSize>50MB</maxFileSize>
- <maxHistory>30</maxHistory>
- <totalSizeCap>10GB</totalSizeCap>
- </rollingPolicy>
- </appender>
-
- <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/>
-
- <root level="INFO">
- <appender-ref ref="CONSOLE"/>
- <appender-ref ref="ROLLING-FILE"/>
- </root>
-</configuration>