aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/main/kotlin
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-29 11:22:24 +0100
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-01 12:32:42 +0200
commit6725abbaa6249e107126ffd5ec58f2a96ce60eee (patch)
treef3fa6d11a04b60a631ee4160a69744b44e08e1ed /sources/hv-collector-core/src/main/kotlin
parent4281a12d8e892f46f5f2226ee0f8aee8b862b177 (diff)
Move ConfigurationProvider to config module
Change-Id: Ic6f955f4e777e06e7c7eed6e08c0cac470e9a51d Issue-ID: DCAEGEN2-1347 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'sources/hv-collector-core/src/main/kotlin')
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt3
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt (renamed from sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt)15
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt33
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt40
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt110
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt16
7 files changed, 25 insertions, 195 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 1b92d90c..e3156a0d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -40,8 +39,6 @@ interface SinkProvider : Closeable {
operator fun invoke(stream: SinkStream, ctx: ClientContext): Lazy<Sink>
}
-typealias ConfigurationProvider = () -> Flux<Routing>
-
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
index 5c64c70b..ba0a9eee 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import arrow.core.Option
import arrow.effects.IO
import io.netty.buffer.ByteBuf
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -33,7 +32,7 @@ interface Collector {
}
interface CollectorProvider : Closeable {
- operator fun invoke(ctx: ClientContext): Option<Collector>
+ operator fun invoke(ctx: ClientContext): Collector
}
interface Server {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
index 2b123fc8..04e575ae 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ParsingException.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/AdapterFactory.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,15 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.impl.adapters
+package org.onap.dcae.collectors.veshv.factory
-class ParsingException(message: String, cause: Throwable) : Exception(message, cause)
+import org.onap.dcae.collectors.veshv.boundary.SinkProvider
+import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since May 2018
+ */
+object AdapterFactory {
+ fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
+}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index 2b29acd9..1c79abd2 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -19,64 +19,45 @@
*/
package org.onap.dcae.collectors.veshv.factory
-import arrow.core.Option
import org.onap.dcae.collectors.veshv.boundary.Collector
import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.Router
import org.onap.dcae.collectors.veshv.impl.VesDecoder
import org.onap.dcae.collectors.veshv.impl.VesHvCollector
import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder
import org.onap.dcae.collectors.veshv.model.ClientContext
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.util.concurrent.atomic.AtomicReference
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since May 2018
*/
-class CollectorFactory(private val configuration: ConfigurationProvider,
+class CollectorFactory(private val configuration: CollectorConfiguration,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int,
- private val healthState: HealthState = HealthState.INSTANCE) {
+ private val maxPayloadSizeBytes: Int) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Routing>()
- configuration()
- .doOnNext {
- logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
- healthState.changeState(HealthDescription.HEALTHY)
- }
- .doOnError {
- logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
- logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
- healthState.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
- }
- .subscribe(config::set)
return object : CollectorProvider {
- override fun invoke(ctx: ClientContext): Option<Collector> =
- config.getOption().map { createVesHvCollector(it, ctx) }
+ override fun invoke(ctx: ClientContext): Collector =
+ createVesHvCollector(ctx)
override fun close() = sinkProvider.close()
}
}
- private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(routing, sinkProvider, ctx, metrics),
+ router = Router(configuration.routing, sinkProvider, ctx, metrics),
metrics = metrics)
companion object {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
deleted file mode 100644
index 20b11753..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/AdapterFactory.kt
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.impl.adapters.kafka.KafkaSinkProvider
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
-
-/**
- * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
- * @since May 2018
- */
-object AdapterFactory {
- fun sinkCreatorFactory(): SinkProvider = KafkaSinkProvider()
-
- fun configurationProvider(config: CbsConfiguration): ConfigurationProvider =
- ConfigurationProviderImpl(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- config)
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
deleted file mode 100644
index 185693c0..00000000
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018-2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.impl.adapters
-
-import com.google.gson.JsonObject
-import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
-import org.onap.dcae.collectors.veshv.model.ServiceContext
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
-import org.onap.dcaegen2.services.sdk.model.streams.StreamType.KAFKA
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
-import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamPredicates.streamOfType
-import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.retry.Jitter
-import reactor.retry.Retry
-import java.time.Duration
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClient>,
- private val firstRequestDelay: Duration,
- private val requestInterval: Duration,
- private val healthState: HealthState,
- private val streamParser: StreamFromGsonParser<KafkaSink>,
- retrySpec: Retry<Any>
-
-) : ConfigurationProvider {
- constructor(cbsClientMono: Mono<CbsClient>, params: CbsConfiguration) : this(
- cbsClientMono,
- params.firstRequestDelay,
- params.requestInterval,
- HealthState.INSTANCE,
- StreamFromGsonParsers.kafkaSinkParser(),
- Retry.any<Any>()
- .retryMax(MAX_RETRIES)
- .fixedBackoff(params.requestInterval)
- .jitter(Jitter.random())
- )
-
- private val retry = retrySpec.doOnRetry {
- logger.withWarn(ServiceContext::mdc) {
- log("Exception from configuration provider client, retrying subscription", it.exception())
- }
- healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
- }
-
- override fun invoke(): Flux<Routing> =
- cbsClientMono
- .doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
- .onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
- .retryWhen(retry)
- .doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
- .flatMapMany(::handleUpdates)
-
- private fun handleUpdates(cbsClient: CbsClient) = cbsClient
- .updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
- firstRequestDelay,
- requestInterval)
- .doOnNext { logger.info(ServiceContext::mdc) { "Received new configuration:\n$it" } }
- .map(::createRoutingDescription)
- .onErrorLog(logger, ServiceContext::mdc) { "Error while creating configuration" }
- .retryWhen(retry)
-
- private fun createRoutingDescription(configuration: JsonObject): Routing = try {
- DataStreams.namedSinks(configuration)
- .filter(streamOfType(KAFKA))
- .map(streamParser::unsafeParse)
- .map { Route(it.name(), it) }
- .asIterable()
- .toList()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
- companion object {
- private const val MAX_RETRIES = 5L
- private val logger = Logger(ConfigurationProviderImpl::class)
- }
-}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
index fab96560..3e19414d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt
@@ -113,25 +113,19 @@ internal class NettyTcpServer(private val serverConfiguration: ServerConfigurati
private fun acceptClientConnection(clientContext: ClientContext, nettyInbound: NettyInbound): Mono<Void> {
metrics.notifyClientConnected()
logger.info(clientContext::fullMdc) { "Handling new client connection" }
- return collectorProvider(clientContext).fold(
- {
- logger.warn(clientContext::fullMdc) { "Collector is not ready. Closing connection" }
- nettyInbound.closeConnectionAndReturn(Mono.empty<Void>())
- },
- handleClient(clientContext, nettyInbound)
- )
+ val collector = collectorProvider(clientContext)
+ return collector.handleClient(clientContext, nettyInbound)
}
- private fun handleClient(clientContext: ClientContext,
- nettyInbound: NettyInbound): (Collector) -> Mono<Void> = { collector ->
+ private fun Collector.handleClient(clientContext: ClientContext,
+ nettyInbound: NettyInbound) =
withConnectionFrom(nettyInbound) { connection ->
connection
.configureIdleTimeout(clientContext, serverConfiguration.idleTimeout)
.logConnectionClosed(clientContext)
}.run {
- collector.handleConnection(nettyInbound.createDataStream())
+ handleConnection(nettyInbound.createDataStream())
}
- }
private fun Connection.configureIdleTimeout(ctx: ClientContext, timeout: Duration): Connection =
onReadIdle(timeout.toMillis()) {