diff options
Diffstat (limited to 'sources')
19 files changed, 122 insertions, 145 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index 9684484b..ccce62a4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.config.api +import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException import org.onap.dcae.collectors.veshv.config.api.model.ValidationException @@ -27,41 +28,56 @@ import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser -import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow +import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft +import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties import reactor.core.publisher.Flux +import reactor.core.publisher.Mono class ConfigurationModule { private val cmd = HvVesCommandLineParser() private val configReader = FileConfigurationReader() private val configValidator = ConfigurationValidator() + private val merger = ConfigurationMerger() fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) fun hvVesConfigurationUpdates(args: Array<String>, configStateListener: ConfigurationStateListener, mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> = - Flux.just(cmd.getConfigurationFile(args)) - .throwOnLeft { MissingArgumentException(it.message, it.cause) } - .map { it.reader().use(configReader::loadConfig) } + Mono.just(cmd.getConfigurationFile(args)) + .throwOnLeft(::MissingArgumentException) + .map { + logger.info { "Using base configuration file: ${it.absolutePath}" } + it.reader().use(configReader::loadConfig) + } .cache() - .flatMap { basePartialConfig -> - val baseConfig = configValidator.validate(basePartialConfig) - .rightOrThrow { ValidationException(it.message) } - val cbsConfigProvider = CbsConfigurationProvider( - CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), - baseConfig.cbs, - configStateListener, - mdc) - val merger = ConfigurationMerger() - cbsConfigProvider() + .flatMapMany { basePartialConfig -> + cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) + .invoke() .map { merger.merge(basePartialConfig, it) } - .map { configValidator.validate(it) } - .throwOnLeft { ValidationException(it.message) } + .map(configValidator::validate) + .throwOnLeft() } + private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, + configStateListener: ConfigurationStateListener, + mdc: MappedDiagnosticContext): CbsConfigurationProvider = + CbsConfigurationProvider( + CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), + cbsConfigurationFrom(basePartialConfig), + configStateListener, + mdc) + + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = + configValidator.validatedCbsConfiguration(basePartialConfig) + .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + + companion object { + private val logger = Logger(ConfigurationModule::class) + } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt index 2fc29829..bea7cc56 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt @@ -19,6 +19,8 @@ */ package org.onap.dcae.collectors.veshv.config.api.model -class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause) +import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError + +class MissingArgumentException(err: WrongArgumentError) : RuntimeException(err.message, err.cause) class ValidationException(message: String) : RuntimeException(message) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index f044492c..63d590a2 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -74,8 +74,6 @@ internal class ConfigurationMerger { updateOption: Option<PartialCollectorConfig>) = applyUpdate(baseOption, updateOption) { base, update -> PartialCollectorConfig( - base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes), - base.kafkaServers.updateToGivenOrNone(update.kafkaServers), base.routing.updateToGivenOrNone(update.routing) ) } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index 3e599b58..ead5655a 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -27,13 +27,13 @@ import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding +import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcae.collectors.veshv.utils.logging.Logger -import java.net.InetSocketAddress -import java.time.Duration /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -41,33 +41,35 @@ import java.time.Duration */ internal class ConfigurationValidator { - fun validate(partialConfig: PartialConfiguration) - : Either<ValidationError, HvVesConfiguration> = binding { - val logLevel = determineLogLevel(partialConfig.logLevel) + fun validate(partialConfig: PartialConfiguration) = + logger.info { "About to validate configuration: $partialConfig" }.let { + binding { + val logLevel = determineLogLevel(partialConfig.logLevel) - val serverConfiguration = partialConfig.server.bind() - .let { createServerConfiguration(it).bind() } + val serverConfiguration = validatedServerConfiguration(partialConfig) + .doOnEmpty { logger.debug { "Cannot bind server configuration" } } + .bind() - val cbsConfiguration = partialConfig.cbs.bind() - .let { createCbsConfiguration(it).bind() } + val cbsConfiguration = validatedCbsConfiguration(partialConfig) + .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } } + .bind() - val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys) + val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys) -// TOD0: retrieve when ConfigurationMerger is implemented -// val collectorConfiguration = partialConfig.collector.bind() -// .let { createCollectorConfig(it).bind() } + val collectorConfiguration = validatedCollectorConfig(partialConfig) + .doOnEmpty { logger.debug { "Cannot bind collector configuration" } } + .bind() + + HvVesConfiguration( + serverConfiguration, + cbsConfiguration, + securityConfiguration, + collectorConfiguration, + logLevel + ) + }.toEither { ValidationException("Some required configuration options are missing") } + } - HvVesConfiguration( - serverConfiguration, - cbsConfiguration, - securityConfiguration, -// TOD0: swap when ConfigurationMerger is implemented -// collectorConfiguration - CollectorConfiguration(emptyList()), -// end TOD0 - logLevel - ) - }.toEither { ValidationError("Some required configuration options are missing") } private fun determineLogLevel(logLevel: Option<LogLevel>) = logLevel.getOrElse { @@ -78,40 +80,38 @@ internal class ConfigurationValidator { DEFAULT_LOG_LEVEL } - private fun createServerConfiguration(partial: PartialServerConfig) = + private fun validatedServerConfiguration(partial: PartialConfiguration) = partial.mapBinding { - ServerConfiguration( - it.listenPort.bind(), - it.idleTimeoutSec.bind(), - it.maxPayloadSizeBytes.bind() - ) + partial.server.bind().let { + ServerConfiguration( + it.listenPort.bind(), + it.idleTimeoutSec.bind(), + it.maxPayloadSizeBytes.bind() + ) + } } - private fun createCbsConfiguration(partial: PartialCbsConfig) = + fun validatedCbsConfiguration(partial: PartialConfiguration) = partial.mapBinding { - CbsConfiguration( - it.firstRequestDelaySec.bind(), - it.requestIntervalSec.bind() - ) + it.cbs.bind().let { + CbsConfiguration( + it.firstRequestDelaySec.bind(), + it.requestIntervalSec.bind() + ) + } } -// TOD0: retrieve when ConfigurationMerger is implemented -// private fun createCollectorConfig(partial: PartialCollectorConfig) = -// partial.mapBinding { -// CollectorConfiguration( -// it.maxRequestSizeBytes.bind(), -// toKafkaServersString(it.kafkaServers.bind()), -// it.routing.bind() -// ) -// } - - private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String = - kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" } + private fun validatedCollectorConfig(partial: PartialConfiguration) = + partial.mapBinding { + partial.collector.bind().let { + CollectorConfiguration( + it.routing.bind() + ) + } + } companion object { val DEFAULT_LOG_LEVEL = LogLevel.INFO private val logger = Logger(ConfigurationValidator::class) } } - -data class ValidationError(val message: String, val cause: Option<Throwable> = None) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt index 9513107b..f6ae5bec 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt @@ -24,6 +24,7 @@ import com.google.gson.GsonBuilder import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter +import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.io.Reader import java.time.Duration @@ -41,4 +42,9 @@ internal class FileConfigurationReader { fun loadConfig(input: Reader): PartialConfiguration = gson.fromJson(input, PartialConfiguration::class.java) + .also { logger.info { "Successfully read file and parsed json to configuration: $it" } } + + companion object { + private val logger = Logger(FileConfigurationReader::class) + } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt index c1a98294..c6730a4c 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt @@ -45,7 +45,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De it.stringValue(CONFIGURATION_FILE).map(::File) }.toEither { WrongArgumentError( - message = "Unexpected error when parsing command line arguments", + message = "Base configuration filepath missing on command line", cmdLineOptionsList = cmdLineOptionsList) } @@ -53,8 +53,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De parse(args) { it.intValue(HEALTH_CHECK_API_PORT) }.getOrElse { - logger.info { "Healthcheck port missing on command line," + - " using default: $DEFAULT_HEALTHCHECK_PORT" } + logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" } DEFAULT_HEALTHCHECK_PORT } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index f3c149cd..b4e1bf6b 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -53,7 +53,5 @@ internal data class PartialCbsConfig( internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None) internal data class PartialCollectorConfig( - val maxRequestSizeBytes: Option<Int> = None, - val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part val routing: Option<Routing> = None ) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 4b89488b..55d06cdd 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -64,8 +64,6 @@ internal object ConfigurationValidatorTest : Spek({ Some(mock()) )), Some(PartialCollectorConfig( - Some(4), - Some(emptyList()), someFromEmptyRouting )), None @@ -103,8 +101,6 @@ internal object ConfigurationValidatorTest : Spek({ securityKeys )), Some(PartialCollectorConfig( - Some(4), - Some(emptyList()), someFromEmptyRouting )), Some(LogLevel.INFO) @@ -152,8 +148,6 @@ internal object ConfigurationValidatorTest : Spek({ securityKeys )), Some(PartialCollectorConfig( - Some(4), - Some(emptyList()), someFromEmptyRouting )), Some(LogLevel.INFO) diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt index 433e4d57..618b818f 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt @@ -57,8 +57,6 @@ internal class VesHvCollector( .transform(::filterInvalidWireFrame) .transform(::decodeProtobufPayload) .transform(::filterInvalidProtobufMessages) - // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here - .handleErrors() .transform(::route) .handleErrors() .doFinally { releaseBuffersMemory() } @@ -106,14 +104,14 @@ internal class VesHvCollector( } } + private fun releaseBuffersMemory() = wireChunkDecoder.release() + .also { logger.debug { "Released buffer memory after handling message stream" } } + private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume { metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it)) logger.handleReactiveStreamError(clientContext, it) } - private fun releaseBuffersMemory() = wireChunkDecoder.release() - .also { logger.debug { "Released buffer memory after handling message stream" } } - private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> = filterFailedWithLog(logger, clientContext::fullMdc, predicate) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 92719e94..430f7981 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND +import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting @@ -159,7 +160,7 @@ object MetricsSpecification : Spek({ .isEqualTo(1) } - it("should gather metrics for sing errors") { + it("should gather metrics for sink errors") { val sut = vesHvWithAlwaysFailingSink(basicRouting) sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP)) @@ -190,7 +191,7 @@ object MetricsSpecification : Spek({ given("rejection causes") { mapOf( ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to - messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1), + messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1), ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame() ).forEach { cause, vesMessage -> on("cause $cause") { diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 1217c471..f79c2e46 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -77,7 +77,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C override fun close() = collectorProvider.close() companion object { - const val MAX_PAYLOAD_SIZE_BYTES = 1024 + const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 } } diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index 6a718eea..2430c74f 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -23,7 +23,6 @@ import arrow.core.None import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing @@ -31,21 +30,17 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription +import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting -import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting -import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload -import reactor.core.publisher.Flux -import java.time.Duration /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -97,7 +92,7 @@ object VesHvSpecification : Spek({ val (sut, sink) = vesHvWithStoringSink() val validMessage = vesWireFrameMessage(PERF3GPP) val msgWithInvalidFrame = messageWithInvalidWireFrameHeader() - val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) + val msgWithTooBigPayload = messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( @@ -208,7 +203,7 @@ object VesHvSpecification : Spek({ val handledMessages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP, "first"), - messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), + messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), vesWireFrameMessage(PERF3GPP)) assertThat(handledMessages).hasSize(1) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt index e9914ef1..8956e81f 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -27,7 +27,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" -const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 +private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 private val perf3gppKafkaSink = ImmutableKafkaSink.builder() .name("PERF3GPP") diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile index a1e89a73..cfd4a7bb 100644 --- a/sources/hv-collector-main/Dockerfile +++ b/sources/hv-collector-main/Dockerfile @@ -1,6 +1,6 @@ FROM docker.io/openjdk:11-jre-slim -LABEL copyright="Copyright (C) 2018 NOKIA" +LABEL copyright="Copyright (C) 2018-2019 NOKIA" LABEL license.name="The Apache Software License, Version 2.0" LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0" LABEL maintainer="Nokia Wroclaw ONAP Team" @@ -19,4 +19,4 @@ COPY target/libs/internal/* ./ COPY src/main/docker/*.sh ./ COPY src/main/docker/base.json /etc/ves-hv/configuration/base.json -COPY target/hv-collector-main-*.jar ./ +COPY target/hv-collector-main-*.jar ./
\ No newline at end of file diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json index 67576c80..e0b9c450 100644 --- a/sources/hv-collector-main/src/main/docker/base.json +++ b/sources/hv-collector-main/src/main/docker/base.json @@ -10,17 +10,5 @@ "requestIntervalSec": 5 }, "security": { - }, - "collector": { - "maxRequestSizeBytes": 1048576, - "kafkaServers": [ - "message-router-kafka:9092" - ], - "routing": [ - { - "fromDomain": "perf3gpp", - "toTopic": "HV_VES_PERF3GPP" - } - ] } } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index 22d8000e..dc207ef8 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.main.servers.VesServer import org.onap.dcae.collectors.veshv.model.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.neverComplete import org.onap.dcae.collectors.veshv.utils.registerShutdownHook import reactor.core.scheduler.Schedulers import java.util.concurrent.atomic.AtomicReference @@ -57,20 +56,20 @@ fun main(args: Array<String>) { HealthState.INSTANCE.changeState(HealthDescription.HEALTHY) } .doOnError { - logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" } - logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" } + logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" } + logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) } HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND) } .doOnNext(::startServer) .doOnError(::logServerStartFailed) - .neverComplete() // TODO: remove after merging configuration stream with cbs + .then() .block() } private fun startServer(config: HvVesConfiguration) { stopRunningServer() Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) - logger.info { "Using configuration: $config" } + logger.debug(ServiceContext::mdc) { "Configuration: $config" } VesServer.start(config).let { registerShutdownHook { shutdownGracefully(it) } diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml index 40f3c8a0..21c1fa31 100644 --- a/sources/hv-collector-main/src/main/resources/logback.xml +++ b/sources/hv-collector-main/src/main/resources/logback.xml @@ -20,9 +20,9 @@ --> <configuration> <property name="COMPONENT_NAME" - value="dcae-hv-ves-collector"/> + value="dcae-hv-ves-collector"/> <property name="COMPONENT_SHORT_NAME" - value="hv-ves"/> + value="hv-ves"/> <property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/> <property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/> @@ -57,7 +57,8 @@ | ${p_mdc}\t | ${p_thr}%n"/> - <property name="ONAP_LOG_PATTERN" value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/> + <property name="ONAP_LOG_PATTERN" + value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/> <property name="ONAP_LOG_PATTERN_FROM_WIKI" value="%nopexception%logger |%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} @@ -76,7 +77,7 @@ </appender> <appender name="ROLLING-FILE" - class="ch.qos.logback.core.rolling.RollingFileAppender"> + class="ch.qos.logback.core.rolling.RollingFileAppender"> <encoder> <pattern>${ONAP_LOG_PATTERN}</pattern> </encoder> @@ -93,8 +94,10 @@ <logger name="io.netty" level="INFO"/> <logger name="io.netty.util" level="WARN"/> <logger name="org.apache.kafka" level="INFO"/> + <logger name="org.onap.dcaegen2.services.sdk" level="INFO"/> + <logger name="org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl" level="WARN"/> - <root level="INFO"> + <root level="DEBUG"> <appender-ref ref="CONSOLE"/> <appender-ref ref="ROLLING-FILE"/> </root> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index d5b33b91..47b3d559 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -34,6 +34,7 @@ import arrow.syntax.collections.firstOption import arrow.typeclasses.MonadContinuation import arrow.typeclasses.binding import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import java.util.concurrent.atomic.AtomicReference /** @@ -57,8 +58,12 @@ fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity) fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity) +fun <A : Exception, B> Flux<Either<A, B>>.throwOnLeft(): Flux<B> = map { it.rightOrThrow() } + fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) } +fun <A, B> Mono<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Mono<B> = map { it.rightOrThrow(f) } + fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get()) fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> = diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt deleted file mode 100644 index aaa598d2..00000000 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt +++ /dev/null @@ -1,25 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.utils - -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono - -fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then()
\ No newline at end of file |