summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-12 06:38:04 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-12 06:38:04 +0000
commit8b8385d323754903ade492a659548d54b56bd7ad (patch)
tree054cee274f2e9f4702adb232e186b9530775d1e9
parent24d0a4c08237dc95c7eca55122e9c305750ce248 (diff)
parent0dd7127aa7258d8fd9d434559750c00ca49f66e6 (diff)
Merge "Extract transforming logic from validator"
-rw-r--r--sources/hv-collector-configuration/pom.xml4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt18
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt13
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt27
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt116
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt160
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt (renamed from sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt)34
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt46
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt83
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt12
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt218
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt237
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt54
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt4
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt7
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt2
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt15
17 files changed, 700 insertions, 350 deletions
diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml
index b6ec4ca2..eda8b448 100644
--- a/sources/hv-collector-configuration/pom.xml
+++ b/sources/hv-collector-configuration/pom.xml
@@ -93,5 +93,9 @@
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
+ <dependency>
+ <groupId>io.arrow-kt</groupId>
+ <artifactId>arrow-extras-data</artifactId>
+ </dependency>
</dependencies>
</project> \ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index f0ee3a42..ded75838 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -19,12 +19,11 @@
*/
package org.onap.dcae.collectors.veshv.config.api
-import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
-import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
+import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser
@@ -41,8 +40,9 @@ class ConfigurationModule {
private val cmd = HvVesCommandLineParser()
private val configParser = JsonConfigurationParser()
+ private val configMerger = ConfigurationMerger()
private val configValidator = ConfigurationValidator()
- private val merger = ConfigurationMerger()
+ private val configTransformer = ConfigurationTransformer()
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
@@ -58,14 +58,15 @@ class ConfigurationModule {
.flatMapMany { basePartialConfig ->
cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
.invoke()
- .map { merger.merge(basePartialConfig, it) }
+ .map { configMerger.merge(basePartialConfig, it) }
.map(configValidator::validate)
.throwOnLeft()
+ .map(configTransformer::toFinalConfiguration)
}
private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
configStateListener: ConfigurationStateListener,
- mdc: MappedDiagnosticContext): CbsConfigurationProvider =
+ mdc: MappedDiagnosticContext) =
CbsConfigurationProvider(
CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
cbsConfigurationFrom(basePartialConfig),
@@ -73,11 +74,12 @@ class ConfigurationModule {
configStateListener,
mdc)
- private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator
- .validatedCbsConfiguration(basePartialConfig)
- .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+ private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
+ configValidator.validatedCbsConfiguration(basePartialConfig)
+ .let { configTransformer.toCbsConfiguration(it) }
companion object {
private val logger = Logger(ConfigurationModule::class)
}
+
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index 8db2f770..fd3cccd9 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -46,13 +46,6 @@ data class CbsConfiguration(
)
data class CollectorConfiguration(
- val routing: Routing
-) {
- val maxPayloadSizeBytes by lazy {
- routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE
- }
-
- companion object {
- internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
- }
-}
+ val routing: Routing,
+ val maxPayloadSizeBytes: Int
+)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index e6707825..96fa4213 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -29,24 +29,23 @@ import arrow.core.toOption
* @since March 2019
*/
internal class ConfigurationMerger {
- fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration =
- PartialConfiguration(
- listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
- idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
+ fun merge(base: PartialConfiguration, update: PartialConfiguration) = PartialConfiguration(
+ listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
+ idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
- firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
- requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
+ firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
+ requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
- sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable),
- keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile),
- keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile),
- trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
- trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile),
+ sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable),
+ keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile),
+ keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile),
+ trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile),
+ trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile),
- streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
+ streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers),
- logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
- )
+ logLevel = base.logLevel.updateToGivenOrNone(update.logLevel)
+ )
private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) =
update.getOrElse(this::orNull).toOption()
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt
new file mode 100644
index 00000000..08cce136
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt
@@ -0,0 +1,116 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.getOrElse
+import arrow.core.toOption
+import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
+import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
+import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
+import java.nio.file.Paths
+import java.time.Duration
+
+internal class ConfigurationTransformer {
+
+ fun toFinalConfiguration(validatedConfig: ValidatedPartialConfiguration): HvVesConfiguration {
+ val serverConfiguration = toServerConfiguration(validatedConfig)
+
+ val cbsConfiguration = toCbsConfiguration(validatedConfig.cbsConfiguration)
+
+ val securityConfiguration = determineSecurityConfiguration(validatedConfig)
+
+ val collectorConfiguration = toCollectorConfiguration(validatedConfig)
+
+ val logLevel = determineLogLevel(validatedConfig.logLevel)
+
+ return HvVesConfiguration(
+ serverConfiguration,
+ cbsConfiguration,
+ securityConfiguration,
+ collectorConfiguration,
+ logLevel
+ )
+ }
+
+ fun toCbsConfiguration(cbsConfiguration: ValidatedCbsConfiguration) = CbsConfiguration(
+ Duration.ofSeconds(cbsConfiguration.firstRequestDelaySec),
+ Duration.ofSeconds(cbsConfiguration.requestIntervalSec)
+ )
+
+ private fun toServerConfiguration(validatedConfig: ValidatedPartialConfiguration) = ServerConfiguration(
+ validatedConfig.listenPort,
+ Duration.ofSeconds(validatedConfig.idleTimeoutSec)
+ )
+
+ private fun determineSecurityConfiguration(validConfig: ValidatedPartialConfiguration) =
+ validConfig.securityConfiguration.fold({ SecurityConfiguration(None) }, { createSecurityConfiguration(it) })
+
+ private fun toCollectorConfiguration(validatedConfig: ValidatedPartialConfiguration) =
+ validatedConfig.streamPublishers.map { Route(it.name(), it) }
+ .let { routing ->
+ CollectorConfiguration(
+ routing,
+ determineMaxPayloadSize(routing)
+ )
+ }
+
+ private fun createSecurityConfiguration(paths: ValidatedSecurityPaths) = SecurityConfiguration(
+ ImmutableSecurityKeys.builder()
+ .keyStore(ImmutableSecurityKeysStore.of(Paths.get(paths.keyStoreFile)))
+ .keyStorePassword(Passwords.fromPath(Paths.get(paths.keyStorePasswordFile)))
+ .trustStore(ImmutableSecurityKeysStore.of(Paths.get(paths.trustStoreFile)))
+ .trustStorePassword(Passwords.fromPath(Paths.get(paths.trustStorePasswordFile)))
+ .build()
+ .toOption()
+ )
+
+ private fun determineMaxPayloadSize(routing: List<Route>) =
+ routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: useDefaultMaxPayloadSize()
+
+ private fun determineLogLevel(logLevel: Option<LogLevel>) =
+ logLevel.getOrElse(::useDefaultLogLevel)
+
+ private fun useDefaultMaxPayloadSize() = DEFAULT_MAX_PAYLOAD_SIZE.also {
+ logger.warn {
+ "Failed to determine \"maxPayloadSizeBytes\" field from routing. Using default ($it)"
+ }
+ }
+
+ private fun useDefaultLogLevel() = DEFAULT_LOG_LEVEL.also {
+ logger.warn { "Missing or invalid \"logLevel\" field. Using default log level ($it)" }
+ }
+
+ companion object {
+ private val logger = Logger(ConfigurationTransformer::class)
+
+ private val DEFAULT_LOG_LEVEL = LogLevel.INFO
+ private const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
+ }
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index f4ce592f..c97c975f 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -19,29 +19,15 @@
*/
package org.onap.dcae.collectors.veshv.config.impl
-import arrow.core.None
-import arrow.core.Option
-import arrow.core.Some
-import arrow.core.getOrElse
-import arrow.core.toOption
-import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import arrow.core.Either
+import arrow.core.Left
+import arrow.core.Right
+import arrow.data.Invalid
+import arrow.data.Validated
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
-import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
-import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
-import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
-import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcae.collectors.veshv.utils.arrow.flatFold
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys
-import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore
-import org.onap.dcaegen2.services.sdk.security.ssl.Passwords
-import java.io.File
-import java.nio.file.Path
-import java.time.Duration
+
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -49,104 +35,62 @@ import java.time.Duration
*/
internal class ConfigurationValidator {
- fun validate(partialConfig: PartialConfiguration) =
- logger.info { "About to validate configuration: $partialConfig" }.let {
- binding {
- val logLevel = determineLogLevel(partialConfig.logLevel)
-
- val serverConfiguration = validatedServerConfiguration(partialConfig)
- .doOnEmpty { logger.debug { "Cannot bind server configuration" } }
- .bind()
-
- val cbsConfiguration = validatedCbsConfiguration(partialConfig)
- .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } }
- .bind()
-
- val securityConfiguration = determineSecurityConfiguration(partialConfig)
- .doOnEmpty { logger.debug { "Cannot bind security configuration" } }
- .bind()
-
- val collectorConfiguration = validatedCollectorConfig(partialConfig)
- .doOnEmpty { logger.debug { "Cannot bind collector configuration" } }
- .bind()
-
- HvVesConfiguration(
- serverConfiguration,
- cbsConfiguration,
- securityConfiguration,
- collectorConfiguration,
- logLevel
- )
- }.toEither { ValidationException("Some required configuration options are missing") }
- }
-
+ fun validate(partial: PartialConfiguration): Either<ValidationException, ValidatedPartialConfiguration> =
+ logger.info { "About to validate configuration: $partial" }.let {
+ val invalidFields = mutableSetOf(
+ validate(partial::streamPublishers)
+ )
+ .union(cbsConfigurationValidation(partial))
+ .union(serverConfigurationValidation(partial))
+ .union(securityValidation(partial))
+ .filter { it.isInvalid }
- private fun determineLogLevel(logLevel: Option<LogLevel>) =
- logLevel.getOrElse {
- logger.warn {
- "Missing or invalid \"logLevel\" field. " +
- "Using default log level ($DEFAULT_LOG_LEVEL)"
+ if (invalidFields.isNotEmpty()) {
+ return Left(ValidationException(validationMessageFrom(invalidFields)))
}
- DEFAULT_LOG_LEVEL
- }
- private fun validatedServerConfiguration(partial: PartialConfiguration) =
- partial.mapBinding {
- ServerConfiguration(
- it.listenPort.bind(),
- Duration.ofSeconds(it.idleTimeoutSec.bind())
- )
+ Right(partial.unsafeAsValidated())
}
- internal fun validatedCbsConfiguration(partial: PartialConfiguration) =
- partial.mapBinding {
- CbsConfiguration(
- Duration.ofSeconds(it.firstRequestDelaySec.bind()),
- Duration.ofSeconds(it.requestIntervalSec.bind())
- )
- }
-
- private fun determineSecurityConfiguration(partial: PartialConfiguration) =
- partial.sslDisable.fold({ createSecurityConfiguration(partial) }, { sslDisabled ->
- if (sslDisabled) {
- Some(SecurityConfiguration(None))
- } else {
- createSecurityConfiguration(partial)
- }
+ fun validatedCbsConfiguration(partial: PartialConfiguration) = ValidatedCbsConfiguration(
+ firstRequestDelaySec = getOrThrowValidationException(partial::firstRequestDelaySec),
+ requestIntervalSec = getOrThrowValidationException(partial::requestIntervalSec)
+ )
+
+ private fun cbsConfigurationValidation(partial: PartialConfiguration) = setOf(
+ validate(partial::firstRequestDelaySec),
+ validate(partial::requestIntervalSec)
+ )
+
+ private fun serverConfigurationValidation(partial: PartialConfiguration) = setOf(
+ validate(partial::listenPort),
+ validate(partial::idleTimeoutSec)
+ )
+
+ private fun securityValidation(partial: PartialConfiguration) =
+ partial.sslDisable.flatFold({
+ validatedSecurityConfiguration(partial)
+ }, {
+ setOf(Validated.Valid("sslDisable flag is set to true"))
})
- private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> =
- partial.mapBinding {
- SecurityConfiguration(
- createSecurityKeys(
- File(it.keyStoreFile.bind()).toPath(),
- File(it.keyStorePasswordFile.bind()).toPath(),
- File(it.trustStoreFile.bind()).toPath(),
- File(it.trustStorePasswordFile.bind()).toPath()
- ).toOption()
- )
- }
+ private fun validatedSecurityConfiguration(partial: PartialConfiguration) = setOf(
+ validate(partial::keyStoreFile),
+ validate(partial::keyStorePasswordFile),
+ validate(partial::trustStoreFile),
+ validate(partial::trustStorePasswordFile)
+ )
- private fun createSecurityKeys(keyStorePath: Path,
- keyStorePasswordPath: Path,
- trustStorePath: Path,
- trustStorePasswordPath: Path) =
- ImmutableSecurityKeys.builder()
- .keyStore(ImmutableSecurityKeysStore.of(keyStorePath))
- .keyStorePassword(Passwords.fromPath(keyStorePasswordPath))
- .trustStore(ImmutableSecurityKeysStore.of(trustStorePath))
- .trustStorePassword(Passwords.fromPath(trustStorePasswordPath))
- .build()
+ private fun <A> validate(property: ConfigProperty<A>) =
+ Validated.fromOption(property.get(), { "- missing property: ${property.name}\n" })
- private fun validatedCollectorConfig(partial: PartialConfiguration) =
- partial.mapBinding { config ->
- CollectorConfiguration(
- config.streamPublishers.bind().map { Route(it.name(), it) }
- )
- }
+ private fun <A> validationMessageFrom(invalidFields: List<Validated<String, A>>): String =
+ invalidFields.map { it as Invalid }
+ .map { it.e }
+ .fold("", String::plus)
+ .let { "Some required configuration properties are missing: \n$it" }
companion object {
- val DEFAULT_LOG_LEVEL = LogLevel.INFO
private val logger = Logger(ConfigurationValidator::class)
}
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index 51f6a665..c8162104 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -21,9 +21,16 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.None
import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
import com.google.gson.annotations.SerializedName
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
+import org.onap.dcae.collectors.veshv.utils.arrow.flatFold
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import kotlin.reflect.KProperty0
+
+internal typealias ConfigProperty<A> = KProperty0<Option<A>>
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -56,4 +63,29 @@ internal data class PartialConfiguration(
@Transient
var streamPublishers: Option<List<KafkaSink>> = None
-)
+) {
+ fun unsafeAsValidated() = ValidatedPartialConfiguration(
+ listenPort = getOrThrowValidationException(::listenPort),
+ idleTimeoutSec = getOrThrowValidationException(::idleTimeoutSec),
+ cbsConfiguration = ValidatedCbsConfiguration(
+ firstRequestDelaySec = getOrThrowValidationException(::firstRequestDelaySec),
+ requestIntervalSec = getOrThrowValidationException(::requestIntervalSec)
+ ),
+ streamPublishers = getOrThrowValidationException(::streamPublishers),
+ securityConfiguration = sslDisable.flatFold({ forceValidatedSecurityPaths() }, { None }),
+ logLevel = logLevel
+ )
+
+ private fun forceValidatedSecurityPaths() =
+ Some(ValidatedSecurityPaths(
+ keyStoreFile = getOrThrowValidationException(::keyStoreFile),
+ keyStorePasswordFile = getOrThrowValidationException(::keyStorePasswordFile),
+ trustStoreFile = getOrThrowValidationException(::trustStoreFile),
+ trustStorePasswordFile = getOrThrowValidationException(::trustStorePasswordFile)
+ ))
+}
+
+internal fun <A> getOrThrowValidationException(property: ConfigProperty<A>) =
+ property().getOrElse {
+ throw ValidationException("Field `${property.name}` was not validated and is missing in configuration")
+ }
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt
new file mode 100644
index 00000000..a230bfc0
--- /dev/null
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt
@@ -0,0 +1,46 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.Option
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+
+
+internal data class ValidatedPartialConfiguration(
+ val listenPort: Int,
+ val idleTimeoutSec: Long,
+ val cbsConfiguration: ValidatedCbsConfiguration,
+ val securityConfiguration: Option<ValidatedSecurityPaths>,
+ val logLevel: Option<LogLevel>,
+ val streamPublishers: List<KafkaSink>
+)
+
+internal data class ValidatedCbsConfiguration(
+ val firstRequestDelaySec: Long,
+ val requestIntervalSec: Long
+)
+
+internal data class ValidatedSecurityPaths(
+ val keyStoreFile: String,
+ val keyStorePasswordFile: String,
+ val trustStoreFile: String,
+ val trustStorePasswordFile: String
+)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
deleted file mode 100644
index dbdf4ad0..00000000
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.config.impl
-
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.jetbrains.spek.api.Spek
-import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.it
-import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since May 2018
- */
-internal object CollectorConfigurationTest : Spek({
-
- describe("CollectorConfiguration") {
- describe("calculating maxPayloadSizeBytes") {
- on("defined routes") {
- val sampleRouting = listOf(
- Route(sink1.name(), sink1),
- Route(sink2.name(), sink2),
- Route(sink3.name(), sink3)
- )
- val configuration = CollectorConfiguration(sampleRouting)
-
- it("should use the highest value among all routes") {
- assertThat(configuration.maxPayloadSizeBytes)
- .isEqualTo(highestMaxPayloadSize)
- }
- }
-
- on("empty routing") {
- val configuration = CollectorConfiguration(emptyList())
-
- it("should use default value") {
- assertThat(configuration.maxPayloadSizeBytes)
- .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE)
- }
- }
- }
- }
-})
-
-private const val highestMaxPayloadSize = 3
-
-private val sink1 = mock<KafkaSink>().also {
- whenever(it.name()).thenReturn("")
- whenever(it.maxPayloadSizeBytes()).thenReturn(1)
-}
-
-private val sink2 = mock<KafkaSink>().also {
- whenever(it.name()).thenReturn("")
- whenever(it.maxPayloadSizeBytes()).thenReturn(2)
-}
-
-private val sink3 = mock<KafkaSink>().also {
- whenever(it.name()).thenReturn("")
- whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
-}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
index cb8d5005..ca09d84f 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
@@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import java.io.InputStreamReader
import java.io.Reader
-import java.time.Duration
/**
* @author Pawel Biniek <pawel.biniek@nokia.com>
@@ -37,14 +36,14 @@ internal object ConfigurationMergerTest : Spek({
describe("Merges partial configurations into one") {
it("merges single parameter into empty config") {
val actual = PartialConfiguration()
- val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN))
val result = ConfigurationMerger().merge(actual, diff)
- assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN))
}
- val someListenPort = Some(45)
+ val someListenPort = Some(defaultListenPort)
it("merges single embedded parameter into empty config") {
val actual = PartialConfiguration()
val diff = PartialConfiguration(listenPort = someListenPort)
@@ -58,11 +57,11 @@ internal object ConfigurationMergerTest : Spek({
val actual = JsonConfigurationParser().parse(
InputStreamReader(
JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader)
- val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO))
+ val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN))
val result = ConfigurationMerger().merge(actual, diff)
- assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO))
+ assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN))
}
it("merges single embedded parameter into full config") {
@@ -74,7 +73,6 @@ internal object ConfigurationMergerTest : Spek({
val result = ConfigurationMerger().merge(actual, diff)
assertThat(result.listenPort).isEqualTo(someListenPort)
- assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt
new file mode 100644
index 00000000..42919e4d
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt
@@ -0,0 +1,218 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import arrow.core.None
+import arrow.core.Option
+import arrow.core.Some
+import arrow.core.getOrElse
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.fail
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
+import java.io.File
+import java.time.Duration
+
+
+internal object ConfigurationTransformerTest : Spek({
+ describe("ConfigurationTransformer") {
+ val cut = ConfigurationTransformer()
+
+ describe("transforming partial configuration to final") {
+ val config = ValidatedPartialConfiguration(
+ listenPort = defaultListenPort,
+ idleTimeoutSec = defaultIdleTimeoutSec,
+ cbsConfiguration = ValidatedCbsConfiguration(
+ firstRequestDelaySec = defaultFirstReqDelaySec,
+ requestIntervalSec = defaultRequestIntervalSec
+ ),
+ securityConfiguration = Some(ValidatedSecurityPaths(
+ keyStoreFile = KEYSTORE,
+ keyStorePasswordFile = KEYSTORE_PASS_FILE,
+ trustStoreFile = TRUSTSTORE,
+ trustStorePasswordFile = TRUSTSTORE_PASS_FILE
+ )),
+ streamPublishers = sampleStreamsDefinition,
+ logLevel = Some(LogLevel.TRACE)
+ )
+
+ given("transformed configuration") {
+ val result = cut.toFinalConfiguration(config)
+
+ it("should create server configuration") {
+ assertThat(result.server.listenPort).isEqualTo(defaultListenPort)
+ assertThat(result.server.idleTimeout)
+ .describedAs("idleTimeout transformed from number to duration")
+ .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
+ }
+
+ it("should create CBS configuration") {
+ assertThat(result.cbs.firstRequestDelay)
+ .describedAs("firstRequestDelay transformed from number to duration")
+ .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+ assertThat(result.cbs.requestInterval)
+ .describedAs("requestInterval transformed from number to duration")
+ .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
+ }
+
+ it("should create collector configuration") {
+ assertThat(result.collector.routing)
+ .describedAs("routing transformed from kafka sinks to routes")
+ .isEqualTo(sampleRouting)
+
+ assertThat(result.collector.maxPayloadSizeBytes)
+ .describedAs("maxPayloadSizeBytes calculated from kafka sinks")
+ .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+ }
+
+ it("should use specified log level") {
+ assertThat(result.logLevel)
+ .describedAs("logLevel was not transformed when present")
+ .isEqualTo(LogLevel.TRACE)
+ }
+
+ it("should create security keys") {
+ result.security.keys.fold({ fail("Should be Some") }, {
+ assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
+ assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
+ it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
+ it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
+ })
+ }
+ }
+ }
+
+ describe("transforming configuration with empty log level") {
+ val config = validatedConfiguration(
+ logLevel = None
+ )
+
+ it("should use default log level") {
+ val result = cut.toFinalConfiguration(config)
+
+ assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
+ }
+ }
+
+ describe("transforming configuration with security disabled") {
+ val config = validatedConfiguration(
+ sslDisable = Some(true),
+ keyStoreFile = "",
+ keyStorePasswordFile = "",
+ trustStoreFile = "",
+ trustStorePasswordFile = ""
+ )
+
+ it("should create valid configuration with empty security keys") {
+ val result = cut.toFinalConfiguration(config)
+
+ assertThat(result.security.keys).isEqualTo(None)
+ }
+ }
+
+ describe("transforming configuration with ssl disable missing") {
+ val config = validatedConfiguration(
+ sslDisable = None
+ )
+
+ it("should create configuration with ssl enabled") {
+ val result = cut.toFinalConfiguration(config)
+ val securityKeys = result.security.keys
+ .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
+ assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath())
+ assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath())
+ securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) }
+ securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) }
+ }
+ }
+
+ describe("calculating maxPayloadSizeBytes") {
+ on("defined routes") {
+ val highestMaxPayloadSize = 3
+ val sink1 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("1")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(1)
+ }
+ val sink2 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("2")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
+ }
+ val config = validatedConfiguration(
+ streamPublishers = listOf(sink1, sink2)
+ )
+
+ val result = cut.toFinalConfiguration(config)
+
+ it("should use the highest value among all routes") {
+ assertThat(result.collector.maxPayloadSizeBytes)
+ .isEqualTo(highestMaxPayloadSize)
+ }
+ }
+
+ on("empty routing") {
+ val config = validatedConfiguration(
+ streamPublishers = emptyList()
+ )
+
+ val result = cut.toFinalConfiguration(config)
+
+ it("should use default value") {
+ assertThat(result.collector.maxPayloadSizeBytes)
+ .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+ }
+ }
+ }
+
+ }
+})
+
+private fun validatedConfiguration(listenPort: Int = defaultListenPort,
+ idleTimeoutSec: Long = defaultIdleTimeoutSec,
+ firstReqDelaySec: Long = defaultFirstReqDelaySec,
+ requestIntervalSec: Long = defaultRequestIntervalSec,
+ sslDisable: Option<Boolean> = Some(false),
+ keyStoreFile: String = KEYSTORE,
+ keyStorePasswordFile: String = KEYSTORE_PASS_FILE,
+ trustStoreFile: String = TRUSTSTORE,
+ trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE,
+ streamPublishers: List<KafkaSink> = sampleStreamsDefinition,
+ logLevel: Option<LogLevel> = Some(LogLevel.INFO)
+): ValidatedPartialConfiguration = PartialConfiguration(
+ listenPort = Some(listenPort),
+ idleTimeoutSec = Some(idleTimeoutSec),
+ firstRequestDelaySec = Some(firstReqDelaySec),
+ requestIntervalSec = Some(requestIntervalSec),
+ streamPublishers = Some(streamPublishers),
+ sslDisable = sslDisable,
+ keyStoreFile = Some(keyStoreFile),
+ keyStorePasswordFile = Some(keyStorePasswordFile),
+ trustStoreFile = Some(trustStoreFile),
+ trustStorePasswordFile = Some(trustStorePasswordFile),
+ logLevel = logLevel
+).unsafeAsValidated()
+
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 5495c865..26a9cc57 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -22,101 +22,84 @@ package org.onap.dcae.collectors.veshv.config.impl
import arrow.core.None
import arrow.core.Option
import arrow.core.Some
-import arrow.core.getOrElse
-import com.nhaarman.mockitokotlin2.mock
-import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
-import org.assertj.core.api.Assertions.fail
+import org.assertj.core.api.Assertions.*
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.config.api.model.Route
-import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
-import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys
-import java.io.File
-import java.nio.file.Paths
-import java.time.Duration
internal object ConfigurationValidatorTest : Spek({
describe("ConfigurationValidator") {
val cut = ConfigurationValidator()
describe("validating partial configuration with missing fields") {
- val config = PartialConfiguration(
- listenPort = Some(1)
- )
-
- it("should return ValidationError") {
- val result = cut.validate(config)
- assertThat(result.isLeft()).isTrue()
- }
- }
+ val config = PartialConfiguration(listenPort = Some(5))
- describe("validating configuration with empty log level") {
- val config = partialConfiguration(
- logLevel = None
- )
-
- it("should use default log level") {
+ it("should return ValidationException with missing required fields description") {
val result = cut.validate(config)
- result.fold(
- {
- fail("Configuration should have been created successfully")
- },
- {
- assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL)
- }
- )
+ result.fold({
+ assertThat(it.message).doesNotContain(PartialConfiguration::listenPort.name)
+
+ assertThat(it.message).contains(PartialConfiguration::idleTimeoutSec.name)
+ assertThat(it.message).contains(PartialConfiguration::firstRequestDelaySec.name)
+ assertThat(it.message).contains(PartialConfiguration::requestIntervalSec.name)
+ assertThat(it.message).contains(PartialConfiguration::streamPublishers.name)
+ assertThat(it.message).contains(PartialConfiguration::keyStoreFile.name)
+ assertThat(it.message).contains(PartialConfiguration::keyStorePasswordFile.name)
+ assertThat(it.message).contains(PartialConfiguration::trustStoreFile.name)
+ assertThat(it.message).contains(PartialConfiguration::trustStorePasswordFile.name)
+
+ assertThat(it.message).doesNotContain(PartialConfiguration::logLevel.name)
+ assertThat(it.message).doesNotContain(PartialConfiguration::sslDisable.name)
+ }, { fail("Should be ValidationException") })
}
}
- describe("validating complete configuration") {
+ describe("validating complete valid configuration") {
val config = PartialConfiguration(
listenPort = Some(defaultListenPort),
idleTimeoutSec = Some(defaultIdleTimeoutSec),
firstRequestDelaySec = Some(defaultFirstReqDelaySec),
requestIntervalSec = Some(defaultRequestIntervalSec),
sslDisable = Some(false),
- keyStoreFile = Some(keyStore),
- keyStorePasswordFile = Some(keyStorePassFile),
- trustStoreFile = Some(trustStore),
- trustStorePasswordFile = Some(trustStorePassFile),
+ keyStoreFile = Some(KEYSTORE),
+ keyStorePasswordFile = Some(KEYSTORE_PASSWORD),
+ trustStoreFile = Some(TRUSTSTORE),
+ trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD),
streamPublishers = Some(sampleStreamsDefinition),
logLevel = Some(LogLevel.TRACE)
)
- it("should create valid configuration") {
+ it("should create validated configuration") {
val result = cut.validate(config)
result.fold(
{
fail("Configuration should have been created successfully")
},
{
- assertThat(it.server.listenPort)
+ assertThat(it.listenPort)
.isEqualTo(defaultListenPort)
- assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
-
- val securityKeys = it.security.keys
- .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
- assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath())
- assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath())
- securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) }
- securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) }
-
- assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
- assertThat(it.cbs.requestInterval)
- .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec))
-
- assertThat(it.collector.routing)
- .isEqualTo(sampleRouting)
- assertThat(it.collector.maxPayloadSizeBytes)
- .isEqualTo(sampleMaxPayloadSize)
-
- assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
+ assertThat(it.idleTimeoutSec)
+ .isEqualTo(defaultIdleTimeoutSec)
+
+ it.securityConfiguration.fold({
+ fail("Should have been validated successfully")
+ }, {
+ assertThat(it.keyStoreFile).isEqualTo(KEYSTORE)
+ assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD)
+ assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE)
+ assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD)
+ })
+
+ assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+ assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
+
+ assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition)
+
+ assertThat(it.logLevel).isEqualTo(Some(LogLevel.TRACE))
}
)
}
@@ -126,29 +109,26 @@ internal object ConfigurationValidatorTest : Spek({
val config = partialConfiguration(
sslDisable = Some(true),
keyStoreFile = Some(""),
- keyStorePassword = Some(""),
- trustStoreFile = Some(""),
- trustStorePassword = Some("")
+ keyStorePasswordFile = None,
+ trustStoreFile = None,
+ trustStorePasswordFile = Some("")
)
- it("should create valid configuration") {
+ it("should return validated configuration regardless of security keys presence") {
val result = cut.validate(config)
result.fold(
{
fail("Configuration should have been created successfully but was $it")
},
{
- assertThat(it.server.idleTimeout)
- .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
+ assertThat(it.idleTimeoutSec).isEqualTo(defaultIdleTimeoutSec)
- assertThat(it.security.keys)
- .isEqualTo(None)
+ assertThat(it.securityConfiguration.isEmpty()).isTrue()
- assertThat(it.cbs.firstRequestDelay)
- .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec))
+ assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+ assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
- assertThat(it.collector.routing)
- .isEqualTo(sampleRouting)
+ assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition)
}
)
}
@@ -159,24 +139,81 @@ internal object ConfigurationValidatorTest : Spek({
sslDisable = None
)
- it("should create valid configuration with ssl enabled") {
+ it("should return validated configuration") {
val result = cut.validate(config)
result.fold(
{
fail("Configuration should have been created successfully but was $it")
},
{
- val securityKeys = it.security.keys
- .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys
- assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath())
- assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath())
- securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) }
- securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) }
+ it.securityConfiguration.fold({
+ fail("Should have been validated successfully")
+ }, {
+ assertThat(it.keyStoreFile).isEqualTo(KEYSTORE)
+ assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD)
+ assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE)
+ assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD)
+ })
+
}
)
}
}
+ describe("validating configuration with ssl enabled, but not all required security fields set") {
+ val config = partialConfiguration(
+ sslDisable = Some(false),
+ keyStoreFile = Some(KEYSTORE),
+ keyStorePasswordFile = None,
+ trustStoreFile = None,
+ trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD)
+ )
+
+ it("should return validated configuration") {
+ val result = cut.validate(config)
+
+ assertThat(result.isLeft())
+ .describedAs("security validation result")
+ .isTrue()
+ }
+ }
+
+ describe("validating CBS configuration from partial") {
+ given("valid CBS configuration") {
+ val config = partialConfiguration()
+
+ it("should returned validated config") {
+ val result = cut.validatedCbsConfiguration(config)
+
+ assertThat(result.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec)
+ assertThat(result.requestIntervalSec).isEqualTo(defaultRequestIntervalSec)
+ }
+
+ }
+
+ given("missing firstReqDelaySec") {
+ val config = partialConfiguration(
+ firstReqDelaySec = None
+ )
+
+ it("should throw validation exception") {
+ assertThatExceptionOfType(ValidationException::class.java).isThrownBy {
+ cut.validatedCbsConfiguration(config)
+ }.withMessageContaining(PartialConfiguration::firstRequestDelaySec.name)
+ }
+ }
+
+ given("missing requestIntervalSec") {
+ val config = partialConfiguration(
+ requestIntervalSec = None)
+
+ it("should throw validation exception") {
+ assertThatExceptionOfType(ValidationException::class.java).isThrownBy {
+ cut.validatedCbsConfiguration(config)
+ }.withMessageContaining(PartialConfiguration::requestIntervalSec.name)
+ }
+ }
+ }
}
})
@@ -185,10 +222,10 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
sslDisable: Option<Boolean> = Some(false),
- keyStoreFile: Option<String> = Some(keyStore),
- keyStorePassword: Option<String> = Some(keyStorePassFile),
- trustStoreFile: Option<String> = Some(trustStore),
- trustStorePassword: Option<String> = Some(trustStorePassFile),
+ keyStoreFile: Option<String> = Some(KEYSTORE),
+ keyStorePasswordFile: Option<String> = Some(KEYSTORE_PASSWORD),
+ trustStoreFile: Option<String> = Some(TRUSTSTORE),
+ trustStorePasswordFile: Option<String> = Some(TRUSTSTORE_PASSWORD),
streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition),
logLevel: Option<LogLevel> = Some(LogLevel.INFO)
) = PartialConfiguration(
@@ -198,35 +235,9 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
requestIntervalSec = requestIntervalSec,
sslDisable = sslDisable,
keyStoreFile = keyStoreFile,
- keyStorePasswordFile = keyStorePassword,
+ keyStorePasswordFile = keyStorePasswordFile,
trustStoreFile = trustStoreFile,
- trustStorePasswordFile = trustStorePassword,
+ trustStorePasswordFile = trustStorePasswordFile,
streamPublishers = streamPublishers,
logLevel = logLevel
)
-
-private fun resourcePathAsString(resource: String) =
- Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
-
-private const val defaultListenPort = 1234
-private const val defaultRequestIntervalSec = 3L
-private const val defaultIdleTimeoutSec = 10L
-private const val defaultFirstReqDelaySec = 10L
-
-private const val keyStore = "test.ks.pkcs12"
-private const val trustStore = "trust.ks.pkcs12"
-private const val keyStorePass = "change.me"
-private const val trustStorePass = "change.me.too"
-private val keyStorePassFile = resourcePathAsString("/test.ks.pass")
-private val trustStorePassFile = resourcePathAsString("/trust.ks.pass")
-
-private const val sampleSinkName = "perf3gpp"
-const val sampleMaxPayloadSize = 1024
-
-private val sink = mock<KafkaSink>().also {
- whenever(it.name()).thenReturn(sampleSinkName)
- whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize)
-}
-
-private val sampleStreamsDefinition = listOf(sink)
-private val sampleRouting = listOf(Route(sink.name(), sink))
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
new file mode 100644
index 00000000..f07af079
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt
@@ -0,0 +1,54 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+import java.nio.file.Paths
+
+private fun resourcePathAsString(resource: String) =
+ Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString()
+
+internal val DEFAULT_LOG_LEVEL = LogLevel.INFO
+
+internal const val defaultListenPort = 1234
+internal const val defaultRequestIntervalSec = 3L
+internal const val defaultIdleTimeoutSec = 10L
+internal const val defaultFirstReqDelaySec = 10L
+
+internal const val KEYSTORE = "test.ks.pkcs12"
+internal const val KEYSTORE_PASSWORD = "change.me"
+internal const val TRUSTSTORE = "trust.ks.pkcs12"
+internal const val TRUSTSTORE_PASSWORD = "change.me.too"
+internal val KEYSTORE_PASS_FILE = resourcePathAsString("/test.ks.pass")
+internal val TRUSTSTORE_PASS_FILE = resourcePathAsString("/trust.ks.pass")
+
+internal const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+
+private val sampleSink = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("perf3gpp")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
+}
+
+internal val sampleStreamsDefinition = listOf(sampleSink)
+internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink)) \ No newline at end of file
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 3002f334..78d2e704 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({
describe("VES High Volume Collector performance") {
it("should handle multiple clients in reasonable time") {
val sink = CountingSink()
- val sut = Sut(CollectorConfiguration(basicRouting), sink)
+ val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink)
val numMessages: Long = 300_000
val runs = 4
@@ -87,7 +87,7 @@ object PerformanceSpecification : Spek({
it("should disconnect on transmission errors") {
val sink = CountingSink()
- val sut = Sut(CollectorConfiguration(basicRouting), sink)
+ val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink)
val numMessages: Long = 100_000
val timeout = Duration.ofSeconds(30)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 88d1567e..4e9b7ef4 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink
import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink
import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink
import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.utils.Closeable
@@ -95,10 +96,10 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory {
}
fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
- Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
+ Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysSuccessfulSink())
fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut =
- Sut(CollectorConfiguration(routing), AlwaysFailingSink())
+ Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysFailingSink())
fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut =
- Sut(CollectorConfiguration(routing), DelayingSink(delay))
+ Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), DelayingSink(delay))
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index f90f4bc9..e74e1f62 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -215,6 +215,6 @@ object VesHvSpecification : Spek({
private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> {
val sink = StoringSink()
- val sut = Sut(CollectorConfiguration(routing), sink)
+ val sut = Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), sink)
return Pair(sut, sink)
}
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index cfed7f32..ceae62db 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -17,6 +17,9 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
+
+@file:Suppress("TooManyFunctions")
+
package org.onap.dcae.collectors.veshv.utils.arrow
import arrow.core.Either
@@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference
* @since July 2018
*/
+
object OptionUtils {
fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A)
: Option<A> = Option.monad().binding(c).fix()
@@ -78,6 +82,17 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply {
fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B)
: Option<B> = let { OptionUtils.binding { c(it) } }
+fun <T> Option<Boolean>.flatFold(ifEmptyOrFalse: () -> T, ifTrue: () -> T) =
+ fold({
+ ifEmptyOrFalse()
+ }, {
+ if (it) {
+ ifTrue()
+ } else {
+ ifEmptyOrFalse()
+ }
+ })
+