diff options
author | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-10 11:36:48 +0200 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-04-12 08:20:15 +0200 |
commit | 0dd7127aa7258d8fd9d434559750c00ca49f66e6 (patch) | |
tree | 484dfd8f93e1e52901e621203200763e2760bee1 /sources | |
parent | 6a7e8dce0126f355a0ef5663304825bea4c79a20 (diff) |
Extract transforming logic from validator
Change-Id: Ic019b1796e17d24f14f41a817af6e5ecd8c7244b
Issue-ID: DCAEGEN2-1416
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
17 files changed, 700 insertions, 350 deletions
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml index b6ec4ca2..eda8b448 100644 --- a/sources/hv-collector-configuration/pom.xml +++ b/sources/hv-collector-configuration/pom.xml @@ -93,5 +93,9 @@ <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-extras-data</artifactId> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index f0ee3a42..ded75838 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -19,12 +19,11 @@ */ package org.onap.dcae.collectors.veshv.config.api -import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException -import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser @@ -41,8 +40,9 @@ class ConfigurationModule { private val cmd = HvVesCommandLineParser() private val configParser = JsonConfigurationParser() + private val configMerger = ConfigurationMerger() private val configValidator = ConfigurationValidator() - private val merger = ConfigurationMerger() + private val configTransformer = ConfigurationTransformer() fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) @@ -58,14 +58,15 @@ class ConfigurationModule { .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) .invoke() - .map { merger.merge(basePartialConfig, it) } + .map { configMerger.merge(basePartialConfig, it) } .map(configValidator::validate) .throwOnLeft() + .map(configTransformer::toFinalConfiguration) } private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, configStateListener: ConfigurationStateListener, - mdc: MappedDiagnosticContext): CbsConfigurationProvider = + mdc: MappedDiagnosticContext) = CbsConfigurationProvider( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), cbsConfigurationFrom(basePartialConfig), @@ -73,11 +74,12 @@ class ConfigurationModule { configStateListener, mdc) - private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator - .validatedCbsConfiguration(basePartialConfig) - .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = + configValidator.validatedCbsConfiguration(basePartialConfig) + .let { configTransformer.toCbsConfiguration(it) } companion object { private val logger = Logger(ConfigurationModule::class) } + } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index 8db2f770..fd3cccd9 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -46,13 +46,6 @@ data class CbsConfiguration( ) data class CollectorConfiguration( - val routing: Routing -) { - val maxPayloadSizeBytes by lazy { - routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE - } - - companion object { - internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 - } -} + val routing: Routing, + val maxPayloadSizeBytes: Int +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index e6707825..96fa4213 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -29,24 +29,23 @@ import arrow.core.toOption * @since March 2019 */ internal class ConfigurationMerger { - fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration = - PartialConfiguration( - listenPort = base.listenPort.updateToGivenOrNone(update.listenPort), - idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), + fun merge(base: PartialConfiguration, update: PartialConfiguration) = PartialConfiguration( + listenPort = base.listenPort.updateToGivenOrNone(update.listenPort), + idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), - firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), - requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec), + firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), + requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec), - sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable), - keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile), - keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile), - trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), - trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile), + sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable), + keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile), + keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile), + trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), + trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile), - streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), + streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), - logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) - ) + logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) + ) private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) = update.getOrElse(this::orNull).toOption() diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt new file mode 100644 index 00000000..08cce136 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt @@ -0,0 +1,116 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords +import java.nio.file.Paths +import java.time.Duration + +internal class ConfigurationTransformer { + + fun toFinalConfiguration(validatedConfig: ValidatedPartialConfiguration): HvVesConfiguration { + val serverConfiguration = toServerConfiguration(validatedConfig) + + val cbsConfiguration = toCbsConfiguration(validatedConfig.cbsConfiguration) + + val securityConfiguration = determineSecurityConfiguration(validatedConfig) + + val collectorConfiguration = toCollectorConfiguration(validatedConfig) + + val logLevel = determineLogLevel(validatedConfig.logLevel) + + return HvVesConfiguration( + serverConfiguration, + cbsConfiguration, + securityConfiguration, + collectorConfiguration, + logLevel + ) + } + + fun toCbsConfiguration(cbsConfiguration: ValidatedCbsConfiguration) = CbsConfiguration( + Duration.ofSeconds(cbsConfiguration.firstRequestDelaySec), + Duration.ofSeconds(cbsConfiguration.requestIntervalSec) + ) + + private fun toServerConfiguration(validatedConfig: ValidatedPartialConfiguration) = ServerConfiguration( + validatedConfig.listenPort, + Duration.ofSeconds(validatedConfig.idleTimeoutSec) + ) + + private fun determineSecurityConfiguration(validConfig: ValidatedPartialConfiguration) = + validConfig.securityConfiguration.fold({ SecurityConfiguration(None) }, { createSecurityConfiguration(it) }) + + private fun toCollectorConfiguration(validatedConfig: ValidatedPartialConfiguration) = + validatedConfig.streamPublishers.map { Route(it.name(), it) } + .let { routing -> + CollectorConfiguration( + routing, + determineMaxPayloadSize(routing) + ) + } + + private fun createSecurityConfiguration(paths: ValidatedSecurityPaths) = SecurityConfiguration( + ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(Paths.get(paths.keyStoreFile))) + .keyStorePassword(Passwords.fromPath(Paths.get(paths.keyStorePasswordFile))) + .trustStore(ImmutableSecurityKeysStore.of(Paths.get(paths.trustStoreFile))) + .trustStorePassword(Passwords.fromPath(Paths.get(paths.trustStorePasswordFile))) + .build() + .toOption() + ) + + private fun determineMaxPayloadSize(routing: List<Route>) = + routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: useDefaultMaxPayloadSize() + + private fun determineLogLevel(logLevel: Option<LogLevel>) = + logLevel.getOrElse(::useDefaultLogLevel) + + private fun useDefaultMaxPayloadSize() = DEFAULT_MAX_PAYLOAD_SIZE.also { + logger.warn { + "Failed to determine \"maxPayloadSizeBytes\" field from routing. Using default ($it)" + } + } + + private fun useDefaultLogLevel() = DEFAULT_LOG_LEVEL.also { + logger.warn { "Missing or invalid \"logLevel\" field. Using default log level ($it)" } + } + + companion object { + private val logger = Logger(ConfigurationTransformer::class) + + private val DEFAULT_LOG_LEVEL = LogLevel.INFO + private const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 + } +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index f4ce592f..c97c975f 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -19,29 +19,15 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some -import arrow.core.getOrElse -import arrow.core.toOption -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import arrow.core.Either +import arrow.core.Left +import arrow.core.Right +import arrow.data.Invalid +import arrow.data.Validated import org.onap.dcae.collectors.veshv.config.api.model.ValidationException -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty -import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.arrow.flatFold import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys -import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore -import org.onap.dcaegen2.services.sdk.security.ssl.Passwords -import java.io.File -import java.nio.file.Path -import java.time.Duration + /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,104 +35,62 @@ import java.time.Duration */ internal class ConfigurationValidator { - fun validate(partialConfig: PartialConfiguration) = - logger.info { "About to validate configuration: $partialConfig" }.let { - binding { - val logLevel = determineLogLevel(partialConfig.logLevel) - - val serverConfiguration = validatedServerConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind server configuration" } } - .bind() - - val cbsConfiguration = validatedCbsConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } } - .bind() - - val securityConfiguration = determineSecurityConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind security configuration" } } - .bind() - - val collectorConfiguration = validatedCollectorConfig(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind collector configuration" } } - .bind() - - HvVesConfiguration( - serverConfiguration, - cbsConfiguration, - securityConfiguration, - collectorConfiguration, - logLevel - ) - }.toEither { ValidationException("Some required configuration options are missing") } - } - + fun validate(partial: PartialConfiguration): Either<ValidationException, ValidatedPartialConfiguration> = + logger.info { "About to validate configuration: $partial" }.let { + val invalidFields = mutableSetOf( + validate(partial::streamPublishers) + ) + .union(cbsConfigurationValidation(partial)) + .union(serverConfigurationValidation(partial)) + .union(securityValidation(partial)) + .filter { it.isInvalid } - private fun determineLogLevel(logLevel: Option<LogLevel>) = - logLevel.getOrElse { - logger.warn { - "Missing or invalid \"logLevel\" field. " + - "Using default log level ($DEFAULT_LOG_LEVEL)" + if (invalidFields.isNotEmpty()) { + return Left(ValidationException(validationMessageFrom(invalidFields))) } - DEFAULT_LOG_LEVEL - } - private fun validatedServerConfiguration(partial: PartialConfiguration) = - partial.mapBinding { - ServerConfiguration( - it.listenPort.bind(), - Duration.ofSeconds(it.idleTimeoutSec.bind()) - ) + Right(partial.unsafeAsValidated()) } - internal fun validatedCbsConfiguration(partial: PartialConfiguration) = - partial.mapBinding { - CbsConfiguration( - Duration.ofSeconds(it.firstRequestDelaySec.bind()), - Duration.ofSeconds(it.requestIntervalSec.bind()) - ) - } - - private fun determineSecurityConfiguration(partial: PartialConfiguration) = - partial.sslDisable.fold({ createSecurityConfiguration(partial) }, { sslDisabled -> - if (sslDisabled) { - Some(SecurityConfiguration(None)) - } else { - createSecurityConfiguration(partial) - } + fun validatedCbsConfiguration(partial: PartialConfiguration) = ValidatedCbsConfiguration( + firstRequestDelaySec = getOrThrowValidationException(partial::firstRequestDelaySec), + requestIntervalSec = getOrThrowValidationException(partial::requestIntervalSec) + ) + + private fun cbsConfigurationValidation(partial: PartialConfiguration) = setOf( + validate(partial::firstRequestDelaySec), + validate(partial::requestIntervalSec) + ) + + private fun serverConfigurationValidation(partial: PartialConfiguration) = setOf( + validate(partial::listenPort), + validate(partial::idleTimeoutSec) + ) + + private fun securityValidation(partial: PartialConfiguration) = + partial.sslDisable.flatFold({ + validatedSecurityConfiguration(partial) + }, { + setOf(Validated.Valid("sslDisable flag is set to true")) }) - private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> = - partial.mapBinding { - SecurityConfiguration( - createSecurityKeys( - File(it.keyStoreFile.bind()).toPath(), - File(it.keyStorePasswordFile.bind()).toPath(), - File(it.trustStoreFile.bind()).toPath(), - File(it.trustStorePasswordFile.bind()).toPath() - ).toOption() - ) - } + private fun validatedSecurityConfiguration(partial: PartialConfiguration) = setOf( + validate(partial::keyStoreFile), + validate(partial::keyStorePasswordFile), + validate(partial::trustStoreFile), + validate(partial::trustStorePasswordFile) + ) - private fun createSecurityKeys(keyStorePath: Path, - keyStorePasswordPath: Path, - trustStorePath: Path, - trustStorePasswordPath: Path) = - ImmutableSecurityKeys.builder() - .keyStore(ImmutableSecurityKeysStore.of(keyStorePath)) - .keyStorePassword(Passwords.fromPath(keyStorePasswordPath)) - .trustStore(ImmutableSecurityKeysStore.of(trustStorePath)) - .trustStorePassword(Passwords.fromPath(trustStorePasswordPath)) - .build() + private fun <A> validate(property: ConfigProperty<A>) = + Validated.fromOption(property.get(), { "- missing property: ${property.name}\n" }) - private fun validatedCollectorConfig(partial: PartialConfiguration) = - partial.mapBinding { config -> - CollectorConfiguration( - config.streamPublishers.bind().map { Route(it.name(), it) } - ) - } + private fun <A> validationMessageFrom(invalidFields: List<Validated<String, A>>): String = + invalidFields.map { it as Invalid } + .map { it.e } + .fold("", String::plus) + .let { "Some required configuration properties are missing: \n$it" } companion object { - val DEFAULT_LOG_LEVEL = LogLevel.INFO private val logger = Logger(ConfigurationValidator::class) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 51f6a665..c8162104 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -21,9 +21,16 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse import com.google.gson.annotations.SerializedName +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException +import org.onap.dcae.collectors.veshv.utils.arrow.flatFold import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import kotlin.reflect.KProperty0 + +internal typealias ConfigProperty<A> = KProperty0<Option<A>> /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -56,4 +63,29 @@ internal data class PartialConfiguration( @Transient var streamPublishers: Option<List<KafkaSink>> = None -) +) { + fun unsafeAsValidated() = ValidatedPartialConfiguration( + listenPort = getOrThrowValidationException(::listenPort), + idleTimeoutSec = getOrThrowValidationException(::idleTimeoutSec), + cbsConfiguration = ValidatedCbsConfiguration( + firstRequestDelaySec = getOrThrowValidationException(::firstRequestDelaySec), + requestIntervalSec = getOrThrowValidationException(::requestIntervalSec) + ), + streamPublishers = getOrThrowValidationException(::streamPublishers), + securityConfiguration = sslDisable.flatFold({ forceValidatedSecurityPaths() }, { None }), + logLevel = logLevel + ) + + private fun forceValidatedSecurityPaths() = + Some(ValidatedSecurityPaths( + keyStoreFile = getOrThrowValidationException(::keyStoreFile), + keyStorePasswordFile = getOrThrowValidationException(::keyStorePasswordFile), + trustStoreFile = getOrThrowValidationException(::trustStoreFile), + trustStorePasswordFile = getOrThrowValidationException(::trustStorePasswordFile) + )) +} + +internal fun <A> getOrThrowValidationException(property: ConfigProperty<A>) = + property().getOrElse { + throw ValidationException("Field `${property.name}` was not validated and is missing in configuration") + } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt new file mode 100644 index 00000000..a230bfc0 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink + + +internal data class ValidatedPartialConfiguration( + val listenPort: Int, + val idleTimeoutSec: Long, + val cbsConfiguration: ValidatedCbsConfiguration, + val securityConfiguration: Option<ValidatedSecurityPaths>, + val logLevel: Option<LogLevel>, + val streamPublishers: List<KafkaSink> +) + +internal data class ValidatedCbsConfiguration( + val firstRequestDelaySec: Long, + val requestIntervalSec: Long +) + +internal data class ValidatedSecurityPaths( + val keyStoreFile: String, + val keyStorePasswordFile: String, + val trustStoreFile: String, + val trustStorePasswordFile: String +) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt deleted file mode 100644 index dbdf4ad0..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal object CollectorConfigurationTest : Spek({ - - describe("CollectorConfiguration") { - describe("calculating maxPayloadSizeBytes") { - on("defined routes") { - val sampleRouting = listOf( - Route(sink1.name(), sink1), - Route(sink2.name(), sink2), - Route(sink3.name(), sink3) - ) - val configuration = CollectorConfiguration(sampleRouting) - - it("should use the highest value among all routes") { - assertThat(configuration.maxPayloadSizeBytes) - .isEqualTo(highestMaxPayloadSize) - } - } - - on("empty routing") { - val configuration = CollectorConfiguration(emptyList()) - - it("should use default value") { - assertThat(configuration.maxPayloadSizeBytes) - .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE) - } - } - } - } -}) - -private const val highestMaxPayloadSize = 3 - -private val sink1 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(1) -} - -private val sink2 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(2) -} - -private val sink3 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize) -} diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt index cb8d5005..ca09d84f 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt @@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import java.io.InputStreamReader import java.io.Reader -import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -37,14 +36,14 @@ internal object ConfigurationMergerTest : Spek({ describe("Merges partial configurations into one") { it("merges single parameter into empty config") { val actual = PartialConfiguration() - val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN)) val result = ConfigurationMerger().merge(actual, diff) - assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN)) } - val someListenPort = Some(45) + val someListenPort = Some(defaultListenPort) it("merges single embedded parameter into empty config") { val actual = PartialConfiguration() val diff = PartialConfiguration(listenPort = someListenPort) @@ -58,11 +57,11 @@ internal object ConfigurationMergerTest : Spek({ val actual = JsonConfigurationParser().parse( InputStreamReader( JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) - val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN)) val result = ConfigurationMerger().merge(actual, diff) - assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN)) } it("merges single embedded parameter into full config") { @@ -74,7 +73,6 @@ internal object ConfigurationMergerTest : Spek({ val result = ConfigurationMerger().merge(actual, diff) assertThat(result.listenPort).isEqualTo(someListenPort) - assertThat(result.idleTimeoutSec.isEmpty()).isFalse() assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt new file mode 100644 index 00000000..42919e4d --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt @@ -0,0 +1,218 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.io.File +import java.time.Duration + + +internal object ConfigurationTransformerTest : Spek({ + describe("ConfigurationTransformer") { + val cut = ConfigurationTransformer() + + describe("transforming partial configuration to final") { + val config = ValidatedPartialConfiguration( + listenPort = defaultListenPort, + idleTimeoutSec = defaultIdleTimeoutSec, + cbsConfiguration = ValidatedCbsConfiguration( + firstRequestDelaySec = defaultFirstReqDelaySec, + requestIntervalSec = defaultRequestIntervalSec + ), + securityConfiguration = Some(ValidatedSecurityPaths( + keyStoreFile = KEYSTORE, + keyStorePasswordFile = KEYSTORE_PASS_FILE, + trustStoreFile = TRUSTSTORE, + trustStorePasswordFile = TRUSTSTORE_PASS_FILE + )), + streamPublishers = sampleStreamsDefinition, + logLevel = Some(LogLevel.TRACE) + ) + + given("transformed configuration") { + val result = cut.toFinalConfiguration(config) + + it("should create server configuration") { + assertThat(result.server.listenPort).isEqualTo(defaultListenPort) + assertThat(result.server.idleTimeout) + .describedAs("idleTimeout transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) + } + + it("should create CBS configuration") { + assertThat(result.cbs.firstRequestDelay) + .describedAs("firstRequestDelay transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) + assertThat(result.cbs.requestInterval) + .describedAs("requestInterval transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec)) + } + + it("should create collector configuration") { + assertThat(result.collector.routing) + .describedAs("routing transformed from kafka sinks to routes") + .isEqualTo(sampleRouting) + + assertThat(result.collector.maxPayloadSizeBytes) + .describedAs("maxPayloadSizeBytes calculated from kafka sinks") + .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + } + + it("should use specified log level") { + assertThat(result.logLevel) + .describedAs("logLevel was not transformed when present") + .isEqualTo(LogLevel.TRACE) + } + + it("should create security keys") { + result.security.keys.fold({ fail("Should be Some") }, { + assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath()) + assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath()) + it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) } + it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) } + }) + } + } + } + + describe("transforming configuration with empty log level") { + val config = validatedConfiguration( + logLevel = None + ) + + it("should use default log level") { + val result = cut.toFinalConfiguration(config) + + assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) + } + } + + describe("transforming configuration with security disabled") { + val config = validatedConfiguration( + sslDisable = Some(true), + keyStoreFile = "", + keyStorePasswordFile = "", + trustStoreFile = "", + trustStorePasswordFile = "" + ) + + it("should create valid configuration with empty security keys") { + val result = cut.toFinalConfiguration(config) + + assertThat(result.security.keys).isEqualTo(None) + } + } + + describe("transforming configuration with ssl disable missing") { + val config = validatedConfiguration( + sslDisable = None + ) + + it("should create configuration with ssl enabled") { + val result = cut.toFinalConfiguration(config) + val securityKeys = result.security.keys + .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys + assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath()) + assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath()) + securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) } + securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) } + } + } + + describe("calculating maxPayloadSizeBytes") { + on("defined routes") { + val highestMaxPayloadSize = 3 + val sink1 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("1") + whenever(it.maxPayloadSizeBytes()).thenReturn(1) + } + val sink2 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("2") + whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize) + } + val config = validatedConfiguration( + streamPublishers = listOf(sink1, sink2) + ) + + val result = cut.toFinalConfiguration(config) + + it("should use the highest value among all routes") { + assertThat(result.collector.maxPayloadSizeBytes) + .isEqualTo(highestMaxPayloadSize) + } + } + + on("empty routing") { + val config = validatedConfiguration( + streamPublishers = emptyList() + ) + + val result = cut.toFinalConfiguration(config) + + it("should use default value") { + assertThat(result.collector.maxPayloadSizeBytes) + .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + } + } + } + + } +}) + +private fun validatedConfiguration(listenPort: Int = defaultListenPort, + idleTimeoutSec: Long = defaultIdleTimeoutSec, + firstReqDelaySec: Long = defaultFirstReqDelaySec, + requestIntervalSec: Long = defaultRequestIntervalSec, + sslDisable: Option<Boolean> = Some(false), + keyStoreFile: String = KEYSTORE, + keyStorePasswordFile: String = KEYSTORE_PASS_FILE, + trustStoreFile: String = TRUSTSTORE, + trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE, + streamPublishers: List<KafkaSink> = sampleStreamsDefinition, + logLevel: Option<LogLevel> = Some(LogLevel.INFO) +): ValidatedPartialConfiguration = PartialConfiguration( + listenPort = Some(listenPort), + idleTimeoutSec = Some(idleTimeoutSec), + firstRequestDelaySec = Some(firstReqDelaySec), + requestIntervalSec = Some(requestIntervalSec), + streamPublishers = Some(streamPublishers), + sslDisable = sslDisable, + keyStoreFile = Some(keyStoreFile), + keyStorePasswordFile = Some(keyStorePasswordFile), + trustStoreFile = Some(trustStoreFile), + trustStorePasswordFile = Some(trustStorePasswordFile), + logLevel = logLevel +).unsafeAsValidated() + diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 5495c865..26a9cc57 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -22,101 +22,84 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option import arrow.core.Some -import arrow.core.getOrElse -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail +import org.assertj.core.api.Assertions.* import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.io.File -import java.nio.file.Paths -import java.time.Duration internal object ConfigurationValidatorTest : Spek({ describe("ConfigurationValidator") { val cut = ConfigurationValidator() describe("validating partial configuration with missing fields") { - val config = PartialConfiguration( - listenPort = Some(1) - ) - - it("should return ValidationError") { - val result = cut.validate(config) - assertThat(result.isLeft()).isTrue() - } - } + val config = PartialConfiguration(listenPort = Some(5)) - describe("validating configuration with empty log level") { - val config = partialConfiguration( - logLevel = None - ) - - it("should use default log level") { + it("should return ValidationException with missing required fields description") { val result = cut.validate(config) - result.fold( - { - fail("Configuration should have been created successfully") - }, - { - assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) - } - ) + result.fold({ + assertThat(it.message).doesNotContain(PartialConfiguration::listenPort.name) + + assertThat(it.message).contains(PartialConfiguration::idleTimeoutSec.name) + assertThat(it.message).contains(PartialConfiguration::firstRequestDelaySec.name) + assertThat(it.message).contains(PartialConfiguration::requestIntervalSec.name) + assertThat(it.message).contains(PartialConfiguration::streamPublishers.name) + assertThat(it.message).contains(PartialConfiguration::keyStoreFile.name) + assertThat(it.message).contains(PartialConfiguration::keyStorePasswordFile.name) + assertThat(it.message).contains(PartialConfiguration::trustStoreFile.name) + assertThat(it.message).contains(PartialConfiguration::trustStorePasswordFile.name) + + assertThat(it.message).doesNotContain(PartialConfiguration::logLevel.name) + assertThat(it.message).doesNotContain(PartialConfiguration::sslDisable.name) + }, { fail("Should be ValidationException") }) } } - describe("validating complete configuration") { + describe("validating complete valid configuration") { val config = PartialConfiguration( listenPort = Some(defaultListenPort), idleTimeoutSec = Some(defaultIdleTimeoutSec), firstRequestDelaySec = Some(defaultFirstReqDelaySec), requestIntervalSec = Some(defaultRequestIntervalSec), sslDisable = Some(false), - keyStoreFile = Some(keyStore), - keyStorePasswordFile = Some(keyStorePassFile), - trustStoreFile = Some(trustStore), - trustStorePasswordFile = Some(trustStorePassFile), + keyStoreFile = Some(KEYSTORE), + keyStorePasswordFile = Some(KEYSTORE_PASSWORD), + trustStoreFile = Some(TRUSTSTORE), + trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD), streamPublishers = Some(sampleStreamsDefinition), logLevel = Some(LogLevel.TRACE) ) - it("should create valid configuration") { + it("should create validated configuration") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully") }, { - assertThat(it.server.listenPort) + assertThat(it.listenPort) .isEqualTo(defaultListenPort) - assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) - - val securityKeys = it.security.keys - .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys - assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath()) - assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath()) - securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) } - securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) } - - assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) - assertThat(it.cbs.requestInterval) - .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec)) - - assertThat(it.collector.routing) - .isEqualTo(sampleRouting) - assertThat(it.collector.maxPayloadSizeBytes) - .isEqualTo(sampleMaxPayloadSize) - - assertThat(it.logLevel).isEqualTo(LogLevel.TRACE) + assertThat(it.idleTimeoutSec) + .isEqualTo(defaultIdleTimeoutSec) + + it.securityConfiguration.fold({ + fail("Should have been validated successfully") + }, { + assertThat(it.keyStoreFile).isEqualTo(KEYSTORE) + assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD) + assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE) + assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD) + }) + + assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) + + assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition) + + assertThat(it.logLevel).isEqualTo(Some(LogLevel.TRACE)) } ) } @@ -126,29 +109,26 @@ internal object ConfigurationValidatorTest : Spek({ val config = partialConfiguration( sslDisable = Some(true), keyStoreFile = Some(""), - keyStorePassword = Some(""), - trustStoreFile = Some(""), - trustStorePassword = Some("") + keyStorePasswordFile = None, + trustStoreFile = None, + trustStorePasswordFile = Some("") ) - it("should create valid configuration") { + it("should return validated configuration regardless of security keys presence") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully but was $it") }, { - assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) + assertThat(it.idleTimeoutSec).isEqualTo(defaultIdleTimeoutSec) - assertThat(it.security.keys) - .isEqualTo(None) + assertThat(it.securityConfiguration.isEmpty()).isTrue() - assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) + assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) - assertThat(it.collector.routing) - .isEqualTo(sampleRouting) + assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition) } ) } @@ -159,24 +139,81 @@ internal object ConfigurationValidatorTest : Spek({ sslDisable = None ) - it("should create valid configuration with ssl enabled") { + it("should return validated configuration") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully but was $it") }, { - val securityKeys = it.security.keys - .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys - assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath()) - assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath()) - securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) } - securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) } + it.securityConfiguration.fold({ + fail("Should have been validated successfully") + }, { + assertThat(it.keyStoreFile).isEqualTo(KEYSTORE) + assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD) + assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE) + assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD) + }) + } ) } } + describe("validating configuration with ssl enabled, but not all required security fields set") { + val config = partialConfiguration( + sslDisable = Some(false), + keyStoreFile = Some(KEYSTORE), + keyStorePasswordFile = None, + trustStoreFile = None, + trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD) + ) + + it("should return validated configuration") { + val result = cut.validate(config) + + assertThat(result.isLeft()) + .describedAs("security validation result") + .isTrue() + } + } + + describe("validating CBS configuration from partial") { + given("valid CBS configuration") { + val config = partialConfiguration() + + it("should returned validated config") { + val result = cut.validatedCbsConfiguration(config) + + assertThat(result.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(result.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) + } + + } + + given("missing firstReqDelaySec") { + val config = partialConfiguration( + firstReqDelaySec = None + ) + + it("should throw validation exception") { + assertThatExceptionOfType(ValidationException::class.java).isThrownBy { + cut.validatedCbsConfiguration(config) + }.withMessageContaining(PartialConfiguration::firstRequestDelaySec.name) + } + } + + given("missing requestIntervalSec") { + val config = partialConfiguration( + requestIntervalSec = None) + + it("should throw validation exception") { + assertThatExceptionOfType(ValidationException::class.java).isThrownBy { + cut.validatedCbsConfiguration(config) + }.withMessageContaining(PartialConfiguration::requestIntervalSec.name) + } + } + } } }) @@ -185,10 +222,10 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec), requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec), sslDisable: Option<Boolean> = Some(false), - keyStoreFile: Option<String> = Some(keyStore), - keyStorePassword: Option<String> = Some(keyStorePassFile), - trustStoreFile: Option<String> = Some(trustStore), - trustStorePassword: Option<String> = Some(trustStorePassFile), + keyStoreFile: Option<String> = Some(KEYSTORE), + keyStorePasswordFile: Option<String> = Some(KEYSTORE_PASSWORD), + trustStoreFile: Option<String> = Some(TRUSTSTORE), + trustStorePasswordFile: Option<String> = Some(TRUSTSTORE_PASSWORD), streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition), logLevel: Option<LogLevel> = Some(LogLevel.INFO) ) = PartialConfiguration( @@ -198,35 +235,9 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor requestIntervalSec = requestIntervalSec, sslDisable = sslDisable, keyStoreFile = keyStoreFile, - keyStorePasswordFile = keyStorePassword, + keyStorePasswordFile = keyStorePasswordFile, trustStoreFile = trustStoreFile, - trustStorePasswordFile = trustStorePassword, + trustStorePasswordFile = trustStorePasswordFile, streamPublishers = streamPublishers, logLevel = logLevel ) - -private fun resourcePathAsString(resource: String) = - Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString() - -private const val defaultListenPort = 1234 -private const val defaultRequestIntervalSec = 3L -private const val defaultIdleTimeoutSec = 10L -private const val defaultFirstReqDelaySec = 10L - -private const val keyStore = "test.ks.pkcs12" -private const val trustStore = "trust.ks.pkcs12" -private const val keyStorePass = "change.me" -private const val trustStorePass = "change.me.too" -private val keyStorePassFile = resourcePathAsString("/test.ks.pass") -private val trustStorePassFile = resourcePathAsString("/trust.ks.pass") - -private const val sampleSinkName = "perf3gpp" -const val sampleMaxPayloadSize = 1024 - -private val sink = mock<KafkaSink>().also { - whenever(it.name()).thenReturn(sampleSinkName) - whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize) -} - -private val sampleStreamsDefinition = listOf(sink) -private val sampleRouting = listOf(Route(sink.name(), sink)) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt new file mode 100644 index 00000000..f07af079 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import java.nio.file.Paths + +private fun resourcePathAsString(resource: String) = + Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString() + +internal val DEFAULT_LOG_LEVEL = LogLevel.INFO + +internal const val defaultListenPort = 1234 +internal const val defaultRequestIntervalSec = 3L +internal const val defaultIdleTimeoutSec = 10L +internal const val defaultFirstReqDelaySec = 10L + +internal const val KEYSTORE = "test.ks.pkcs12" +internal const val KEYSTORE_PASSWORD = "change.me" +internal const val TRUSTSTORE = "trust.ks.pkcs12" +internal const val TRUSTSTORE_PASSWORD = "change.me.too" +internal val KEYSTORE_PASS_FILE = resourcePathAsString("/test.ks.pass") +internal val TRUSTSTORE_PASS_FILE = resourcePathAsString("/trust.ks.pass") + +internal const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val sampleSink = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("perf3gpp") + whenever(it.maxPayloadSizeBytes()).thenReturn(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) +} + +internal val sampleStreamsDefinition = listOf(sampleSink) +internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 3002f334..78d2e704 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() - val sut = Sut(CollectorConfiguration(basicRouting), sink) + val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink) val numMessages: Long = 300_000 val runs = 4 @@ -87,7 +87,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() - val sut = Sut(CollectorConfiguration(basicRouting), sink) + val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 88d1567e..4e9b7ef4 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable @@ -95,10 +96,10 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory { } fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysSuccessfulSink()) fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), AlwaysFailingSink()) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysFailingSink()) fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), DelayingSink(delay)) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), DelayingSink(delay)) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index f90f4bc9..e74e1f62 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -215,6 +215,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> { val sink = StoringSink() - val sut = Sut(CollectorConfiguration(routing), sink) + val sut = Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), sink) return Pair(sut, sink) } diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index cfed7f32..ceae62db 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -17,6 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + +@file:Suppress("TooManyFunctions") + package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference * @since July 2018 */ + object OptionUtils { fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A) : Option<A> = Option.monad().binding(c).fix() @@ -78,6 +82,17 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply { fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B) : Option<B> = let { OptionUtils.binding { c(it) } } +fun <T> Option<Boolean>.flatFold(ifEmptyOrFalse: () -> T, ifTrue: () -> T) = + fold({ + ifEmptyOrFalse() + }, { + if (it) { + ifTrue() + } else { + ifEmptyOrFalse() + } + }) + |