diff options
67 files changed, 1063 insertions, 587 deletions
diff --git a/build/hv-collector-coverage/pom.xml b/build/hv-collector-coverage/pom.xml index 18193929..ff94d939 100644 --- a/build/hv-collector-coverage/pom.xml +++ b/build/hv-collector-coverage/pom.xml @@ -140,6 +140,11 @@ </dependency> <dependency> <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-server</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-ssl</artifactId> <version>${project.parent.version}</version> </dependency> diff --git a/sources/hv-collector-configuration/pom.xml b/sources/hv-collector-configuration/pom.xml index b6ec4ca2..eda8b448 100644 --- a/sources/hv-collector-configuration/pom.xml +++ b/sources/hv-collector-configuration/pom.xml @@ -93,5 +93,9 @@ <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> </dependency> + <dependency> + <groupId>io.arrow-kt</groupId> + <artifactId>arrow-extras-data</artifactId> + </dependency> </dependencies> </project>
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index f0ee3a42..ded75838 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -19,12 +19,11 @@ */ package org.onap.dcae.collectors.veshv.config.api -import arrow.core.getOrElse import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException -import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.config.impl.CbsConfigurationProvider import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger +import org.onap.dcae.collectors.veshv.config.impl.ConfigurationTransformer import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser import org.onap.dcae.collectors.veshv.config.impl.JsonConfigurationParser @@ -41,8 +40,9 @@ class ConfigurationModule { private val cmd = HvVesCommandLineParser() private val configParser = JsonConfigurationParser() + private val configMerger = ConfigurationMerger() private val configValidator = ConfigurationValidator() - private val merger = ConfigurationMerger() + private val configTransformer = ConfigurationTransformer() fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args) @@ -58,14 +58,15 @@ class ConfigurationModule { .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) .invoke() - .map { merger.merge(basePartialConfig, it) } + .map { configMerger.merge(basePartialConfig, it) } .map(configValidator::validate) .throwOnLeft() + .map(configTransformer::toFinalConfiguration) } private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration, configStateListener: ConfigurationStateListener, - mdc: MappedDiagnosticContext): CbsConfigurationProvider = + mdc: MappedDiagnosticContext) = CbsConfigurationProvider( CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()), cbsConfigurationFrom(basePartialConfig), @@ -73,11 +74,12 @@ class ConfigurationModule { configStateListener, mdc) - private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = configValidator - .validatedCbsConfiguration(basePartialConfig) - .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") } + private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) = + configValidator.validatedCbsConfiguration(basePartialConfig) + .let { configTransformer.toCbsConfiguration(it) } companion object { private val logger = Logger(ConfigurationModule::class) } + } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index 8db2f770..fd3cccd9 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -46,13 +46,6 @@ data class CbsConfiguration( ) data class CollectorConfiguration( - val routing: Routing -) { - val maxPayloadSizeBytes by lazy { - routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE - } - - companion object { - internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 - } -} + val routing: Routing, + val maxPayloadSizeBytes: Int +) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index e6707825..96fa4213 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -29,24 +29,23 @@ import arrow.core.toOption * @since March 2019 */ internal class ConfigurationMerger { - fun merge(base: PartialConfiguration, update: PartialConfiguration): PartialConfiguration = - PartialConfiguration( - listenPort = base.listenPort.updateToGivenOrNone(update.listenPort), - idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), + fun merge(base: PartialConfiguration, update: PartialConfiguration) = PartialConfiguration( + listenPort = base.listenPort.updateToGivenOrNone(update.listenPort), + idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), - firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), - requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec), + firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), + requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec), - sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable), - keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile), - keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile), - trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), - trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile), + sslDisable = base.sslDisable.updateToGivenOrNone(update.sslDisable), + keyStoreFile = base.keyStoreFile.updateToGivenOrNone(update.keyStoreFile), + keyStorePasswordFile = base.keyStorePasswordFile.updateToGivenOrNone(update.keyStorePasswordFile), + trustStoreFile = base.trustStoreFile.updateToGivenOrNone(update.trustStoreFile), + trustStorePasswordFile = base.trustStorePasswordFile.updateToGivenOrNone(update.trustStorePasswordFile), - streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), + streamPublishers = base.streamPublishers.updateToGivenOrNone(update.streamPublishers), - logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) - ) + logLevel = base.logLevel.updateToGivenOrNone(update.logLevel) + ) private fun <T> Option<T>.updateToGivenOrNone(update: Option<T>) = update.getOrElse(this::orNull).toOption() diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt new file mode 100644 index 00000000..08cce136 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformer.kt @@ -0,0 +1,116 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.getOrElse +import arrow.core.toOption +import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys +import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore +import org.onap.dcaegen2.services.sdk.security.ssl.Passwords +import java.nio.file.Paths +import java.time.Duration + +internal class ConfigurationTransformer { + + fun toFinalConfiguration(validatedConfig: ValidatedPartialConfiguration): HvVesConfiguration { + val serverConfiguration = toServerConfiguration(validatedConfig) + + val cbsConfiguration = toCbsConfiguration(validatedConfig.cbsConfiguration) + + val securityConfiguration = determineSecurityConfiguration(validatedConfig) + + val collectorConfiguration = toCollectorConfiguration(validatedConfig) + + val logLevel = determineLogLevel(validatedConfig.logLevel) + + return HvVesConfiguration( + serverConfiguration, + cbsConfiguration, + securityConfiguration, + collectorConfiguration, + logLevel + ) + } + + fun toCbsConfiguration(cbsConfiguration: ValidatedCbsConfiguration) = CbsConfiguration( + Duration.ofSeconds(cbsConfiguration.firstRequestDelaySec), + Duration.ofSeconds(cbsConfiguration.requestIntervalSec) + ) + + private fun toServerConfiguration(validatedConfig: ValidatedPartialConfiguration) = ServerConfiguration( + validatedConfig.listenPort, + Duration.ofSeconds(validatedConfig.idleTimeoutSec) + ) + + private fun determineSecurityConfiguration(validConfig: ValidatedPartialConfiguration) = + validConfig.securityConfiguration.fold({ SecurityConfiguration(None) }, { createSecurityConfiguration(it) }) + + private fun toCollectorConfiguration(validatedConfig: ValidatedPartialConfiguration) = + validatedConfig.streamPublishers.map { Route(it.name(), it) } + .let { routing -> + CollectorConfiguration( + routing, + determineMaxPayloadSize(routing) + ) + } + + private fun createSecurityConfiguration(paths: ValidatedSecurityPaths) = SecurityConfiguration( + ImmutableSecurityKeys.builder() + .keyStore(ImmutableSecurityKeysStore.of(Paths.get(paths.keyStoreFile))) + .keyStorePassword(Passwords.fromPath(Paths.get(paths.keyStorePasswordFile))) + .trustStore(ImmutableSecurityKeysStore.of(Paths.get(paths.trustStoreFile))) + .trustStorePassword(Passwords.fromPath(Paths.get(paths.trustStorePasswordFile))) + .build() + .toOption() + ) + + private fun determineMaxPayloadSize(routing: List<Route>) = + routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: useDefaultMaxPayloadSize() + + private fun determineLogLevel(logLevel: Option<LogLevel>) = + logLevel.getOrElse(::useDefaultLogLevel) + + private fun useDefaultMaxPayloadSize() = DEFAULT_MAX_PAYLOAD_SIZE.also { + logger.warn { + "Failed to determine \"maxPayloadSizeBytes\" field from routing. Using default ($it)" + } + } + + private fun useDefaultLogLevel() = DEFAULT_LOG_LEVEL.also { + logger.warn { "Missing or invalid \"logLevel\" field. Using default log level ($it)" } + } + + companion object { + private val logger = Logger(ConfigurationTransformer::class) + + private val DEFAULT_LOG_LEVEL = LogLevel.INFO + private const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 + } +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index f4ce592f..c97c975f 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -19,29 +19,15 @@ */ package org.onap.dcae.collectors.veshv.config.impl -import arrow.core.None -import arrow.core.Option -import arrow.core.Some -import arrow.core.getOrElse -import arrow.core.toOption -import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration +import arrow.core.Either +import arrow.core.Left +import arrow.core.Right +import arrow.data.Invalid +import arrow.data.Validated import org.onap.dcae.collectors.veshv.config.api.model.ValidationException -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration -import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding -import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty -import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding -import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcae.collectors.veshv.utils.arrow.flatFold import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys -import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore -import org.onap.dcaegen2.services.sdk.security.ssl.Passwords -import java.io.File -import java.nio.file.Path -import java.time.Duration + /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -49,104 +35,62 @@ import java.time.Duration */ internal class ConfigurationValidator { - fun validate(partialConfig: PartialConfiguration) = - logger.info { "About to validate configuration: $partialConfig" }.let { - binding { - val logLevel = determineLogLevel(partialConfig.logLevel) - - val serverConfiguration = validatedServerConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind server configuration" } } - .bind() - - val cbsConfiguration = validatedCbsConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } } - .bind() - - val securityConfiguration = determineSecurityConfiguration(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind security configuration" } } - .bind() - - val collectorConfiguration = validatedCollectorConfig(partialConfig) - .doOnEmpty { logger.debug { "Cannot bind collector configuration" } } - .bind() - - HvVesConfiguration( - serverConfiguration, - cbsConfiguration, - securityConfiguration, - collectorConfiguration, - logLevel - ) - }.toEither { ValidationException("Some required configuration options are missing") } - } - + fun validate(partial: PartialConfiguration): Either<ValidationException, ValidatedPartialConfiguration> = + logger.info { "About to validate configuration: $partial" }.let { + val invalidFields = mutableSetOf( + validate(partial::streamPublishers) + ) + .union(cbsConfigurationValidation(partial)) + .union(serverConfigurationValidation(partial)) + .union(securityValidation(partial)) + .filter { it.isInvalid } - private fun determineLogLevel(logLevel: Option<LogLevel>) = - logLevel.getOrElse { - logger.warn { - "Missing or invalid \"logLevel\" field. " + - "Using default log level ($DEFAULT_LOG_LEVEL)" + if (invalidFields.isNotEmpty()) { + return Left(ValidationException(validationMessageFrom(invalidFields))) } - DEFAULT_LOG_LEVEL - } - private fun validatedServerConfiguration(partial: PartialConfiguration) = - partial.mapBinding { - ServerConfiguration( - it.listenPort.bind(), - Duration.ofSeconds(it.idleTimeoutSec.bind()) - ) + Right(partial.unsafeAsValidated()) } - internal fun validatedCbsConfiguration(partial: PartialConfiguration) = - partial.mapBinding { - CbsConfiguration( - Duration.ofSeconds(it.firstRequestDelaySec.bind()), - Duration.ofSeconds(it.requestIntervalSec.bind()) - ) - } - - private fun determineSecurityConfiguration(partial: PartialConfiguration) = - partial.sslDisable.fold({ createSecurityConfiguration(partial) }, { sslDisabled -> - if (sslDisabled) { - Some(SecurityConfiguration(None)) - } else { - createSecurityConfiguration(partial) - } + fun validatedCbsConfiguration(partial: PartialConfiguration) = ValidatedCbsConfiguration( + firstRequestDelaySec = getOrThrowValidationException(partial::firstRequestDelaySec), + requestIntervalSec = getOrThrowValidationException(partial::requestIntervalSec) + ) + + private fun cbsConfigurationValidation(partial: PartialConfiguration) = setOf( + validate(partial::firstRequestDelaySec), + validate(partial::requestIntervalSec) + ) + + private fun serverConfigurationValidation(partial: PartialConfiguration) = setOf( + validate(partial::listenPort), + validate(partial::idleTimeoutSec) + ) + + private fun securityValidation(partial: PartialConfiguration) = + partial.sslDisable.flatFold({ + validatedSecurityConfiguration(partial) + }, { + setOf(Validated.Valid("sslDisable flag is set to true")) }) - private fun createSecurityConfiguration(partial: PartialConfiguration): Option<SecurityConfiguration> = - partial.mapBinding { - SecurityConfiguration( - createSecurityKeys( - File(it.keyStoreFile.bind()).toPath(), - File(it.keyStorePasswordFile.bind()).toPath(), - File(it.trustStoreFile.bind()).toPath(), - File(it.trustStorePasswordFile.bind()).toPath() - ).toOption() - ) - } + private fun validatedSecurityConfiguration(partial: PartialConfiguration) = setOf( + validate(partial::keyStoreFile), + validate(partial::keyStorePasswordFile), + validate(partial::trustStoreFile), + validate(partial::trustStorePasswordFile) + ) - private fun createSecurityKeys(keyStorePath: Path, - keyStorePasswordPath: Path, - trustStorePath: Path, - trustStorePasswordPath: Path) = - ImmutableSecurityKeys.builder() - .keyStore(ImmutableSecurityKeysStore.of(keyStorePath)) - .keyStorePassword(Passwords.fromPath(keyStorePasswordPath)) - .trustStore(ImmutableSecurityKeysStore.of(trustStorePath)) - .trustStorePassword(Passwords.fromPath(trustStorePasswordPath)) - .build() + private fun <A> validate(property: ConfigProperty<A>) = + Validated.fromOption(property.get(), { "- missing property: ${property.name}\n" }) - private fun validatedCollectorConfig(partial: PartialConfiguration) = - partial.mapBinding { config -> - CollectorConfiguration( - config.streamPublishers.bind().map { Route(it.name(), it) } - ) - } + private fun <A> validationMessageFrom(invalidFields: List<Validated<String, A>>): String = + invalidFields.map { it as Invalid } + .map { it.e } + .fold("", String::plus) + .let { "Some required configuration properties are missing: \n$it" } companion object { - val DEFAULT_LOG_LEVEL = LogLevel.INFO private val logger = Logger(ConfigurationValidator::class) } } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 51f6a665..c8162104 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/PartialConfiguration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -21,9 +21,16 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse import com.google.gson.annotations.SerializedName +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException +import org.onap.dcae.collectors.veshv.utils.arrow.flatFold import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import kotlin.reflect.KProperty0 + +internal typealias ConfigProperty<A> = KProperty0<Option<A>> /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -56,4 +63,29 @@ internal data class PartialConfiguration( @Transient var streamPublishers: Option<List<KafkaSink>> = None -) +) { + fun unsafeAsValidated() = ValidatedPartialConfiguration( + listenPort = getOrThrowValidationException(::listenPort), + idleTimeoutSec = getOrThrowValidationException(::idleTimeoutSec), + cbsConfiguration = ValidatedCbsConfiguration( + firstRequestDelaySec = getOrThrowValidationException(::firstRequestDelaySec), + requestIntervalSec = getOrThrowValidationException(::requestIntervalSec) + ), + streamPublishers = getOrThrowValidationException(::streamPublishers), + securityConfiguration = sslDisable.flatFold({ forceValidatedSecurityPaths() }, { None }), + logLevel = logLevel + ) + + private fun forceValidatedSecurityPaths() = + Some(ValidatedSecurityPaths( + keyStoreFile = getOrThrowValidationException(::keyStoreFile), + keyStorePasswordFile = getOrThrowValidationException(::keyStorePasswordFile), + trustStoreFile = getOrThrowValidationException(::trustStoreFile), + trustStorePasswordFile = getOrThrowValidationException(::trustStorePasswordFile) + )) +} + +internal fun <A> getOrThrowValidationException(property: ConfigProperty<A>) = + property().getOrElse { + throw ValidationException("Field `${property.name}` was not validated and is missing in configuration") + } diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt new file mode 100644 index 00000000..a230bfc0 --- /dev/null +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/validated_configuration.kt @@ -0,0 +1,46 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.Option +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink + + +internal data class ValidatedPartialConfiguration( + val listenPort: Int, + val idleTimeoutSec: Long, + val cbsConfiguration: ValidatedCbsConfiguration, + val securityConfiguration: Option<ValidatedSecurityPaths>, + val logLevel: Option<LogLevel>, + val streamPublishers: List<KafkaSink> +) + +internal data class ValidatedCbsConfiguration( + val firstRequestDelaySec: Long, + val requestIntervalSec: Long +) + +internal data class ValidatedSecurityPaths( + val keyStoreFile: String, + val keyStorePasswordFile: String, + val trustStoreFile: String, + val trustStorePasswordFile: String +) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt deleted file mode 100644 index dbdf4ad0..00000000 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt +++ /dev/null @@ -1,83 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2019 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.config.impl - -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.jetbrains.spek.api.Spek -import org.jetbrains.spek.api.dsl.describe -import org.jetbrains.spek.api.dsl.it -import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration -import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since May 2018 - */ -internal object CollectorConfigurationTest : Spek({ - - describe("CollectorConfiguration") { - describe("calculating maxPayloadSizeBytes") { - on("defined routes") { - val sampleRouting = listOf( - Route(sink1.name(), sink1), - Route(sink2.name(), sink2), - Route(sink3.name(), sink3) - ) - val configuration = CollectorConfiguration(sampleRouting) - - it("should use the highest value among all routes") { - assertThat(configuration.maxPayloadSizeBytes) - .isEqualTo(highestMaxPayloadSize) - } - } - - on("empty routing") { - val configuration = CollectorConfiguration(emptyList()) - - it("should use default value") { - assertThat(configuration.maxPayloadSizeBytes) - .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE) - } - } - } - } -}) - -private const val highestMaxPayloadSize = 3 - -private val sink1 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(1) -} - -private val sink2 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(2) -} - -private val sink3 = mock<KafkaSink>().also { - whenever(it.name()).thenReturn("") - whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize) -} diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt index cb8d5005..ca09d84f 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt @@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import java.io.InputStreamReader import java.io.Reader -import java.time.Duration /** * @author Pawel Biniek <pawel.biniek@nokia.com> @@ -37,14 +36,14 @@ internal object ConfigurationMergerTest : Spek({ describe("Merges partial configurations into one") { it("merges single parameter into empty config") { val actual = PartialConfiguration() - val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN)) val result = ConfigurationMerger().merge(actual, diff) - assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN)) } - val someListenPort = Some(45) + val someListenPort = Some(defaultListenPort) it("merges single embedded parameter into empty config") { val actual = PartialConfiguration() val diff = PartialConfiguration(listenPort = someListenPort) @@ -58,11 +57,11 @@ internal object ConfigurationMergerTest : Spek({ val actual = JsonConfigurationParser().parse( InputStreamReader( JsonConfigurationParserTest.javaClass.getResourceAsStream("/sampleConfig.json")) as Reader) - val diff = PartialConfiguration(logLevel = Some(LogLevel.INFO)) + val diff = PartialConfiguration(logLevel = Some(LogLevel.WARN)) val result = ConfigurationMerger().merge(actual, diff) - assertThat(result.logLevel).isEqualTo(Some(LogLevel.INFO)) + assertThat(result.logLevel).isEqualTo(Some(LogLevel.WARN)) } it("merges single embedded parameter into full config") { @@ -74,7 +73,6 @@ internal object ConfigurationMergerTest : Spek({ val result = ConfigurationMerger().merge(actual, diff) assertThat(result.listenPort).isEqualTo(someListenPort) - assertThat(result.idleTimeoutSec.isEmpty()).isFalse() assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt new file mode 100644 index 00000000..42919e4d --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationTransformerTest.kt @@ -0,0 +1,218 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import arrow.core.None +import arrow.core.Option +import arrow.core.Some +import arrow.core.getOrElse +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.assertj.core.api.Assertions.fail +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys +import java.io.File +import java.time.Duration + + +internal object ConfigurationTransformerTest : Spek({ + describe("ConfigurationTransformer") { + val cut = ConfigurationTransformer() + + describe("transforming partial configuration to final") { + val config = ValidatedPartialConfiguration( + listenPort = defaultListenPort, + idleTimeoutSec = defaultIdleTimeoutSec, + cbsConfiguration = ValidatedCbsConfiguration( + firstRequestDelaySec = defaultFirstReqDelaySec, + requestIntervalSec = defaultRequestIntervalSec + ), + securityConfiguration = Some(ValidatedSecurityPaths( + keyStoreFile = KEYSTORE, + keyStorePasswordFile = KEYSTORE_PASS_FILE, + trustStoreFile = TRUSTSTORE, + trustStorePasswordFile = TRUSTSTORE_PASS_FILE + )), + streamPublishers = sampleStreamsDefinition, + logLevel = Some(LogLevel.TRACE) + ) + + given("transformed configuration") { + val result = cut.toFinalConfiguration(config) + + it("should create server configuration") { + assertThat(result.server.listenPort).isEqualTo(defaultListenPort) + assertThat(result.server.idleTimeout) + .describedAs("idleTimeout transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) + } + + it("should create CBS configuration") { + assertThat(result.cbs.firstRequestDelay) + .describedAs("firstRequestDelay transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) + assertThat(result.cbs.requestInterval) + .describedAs("requestInterval transformed from number to duration") + .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec)) + } + + it("should create collector configuration") { + assertThat(result.collector.routing) + .describedAs("routing transformed from kafka sinks to routes") + .isEqualTo(sampleRouting) + + assertThat(result.collector.maxPayloadSizeBytes) + .describedAs("maxPayloadSizeBytes calculated from kafka sinks") + .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + } + + it("should use specified log level") { + assertThat(result.logLevel) + .describedAs("logLevel was not transformed when present") + .isEqualTo(LogLevel.TRACE) + } + + it("should create security keys") { + result.security.keys.fold({ fail("Should be Some") }, { + assertThat(it.keyStore().path()).isEqualTo(File(KEYSTORE).toPath()) + assertThat(it.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath()) + it.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) } + it.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) } + }) + } + } + } + + describe("transforming configuration with empty log level") { + val config = validatedConfiguration( + logLevel = None + ) + + it("should use default log level") { + val result = cut.toFinalConfiguration(config) + + assertThat(result.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) + } + } + + describe("transforming configuration with security disabled") { + val config = validatedConfiguration( + sslDisable = Some(true), + keyStoreFile = "", + keyStorePasswordFile = "", + trustStoreFile = "", + trustStorePasswordFile = "" + ) + + it("should create valid configuration with empty security keys") { + val result = cut.toFinalConfiguration(config) + + assertThat(result.security.keys).isEqualTo(None) + } + } + + describe("transforming configuration with ssl disable missing") { + val config = validatedConfiguration( + sslDisable = None + ) + + it("should create configuration with ssl enabled") { + val result = cut.toFinalConfiguration(config) + val securityKeys = result.security.keys + .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys + assertThat(securityKeys.keyStore().path()).isEqualTo(File(KEYSTORE).toPath()) + assertThat(securityKeys.trustStore().path()).isEqualTo(File(TRUSTSTORE).toPath()) + securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(KEYSTORE_PASSWORD.toCharArray()) } + securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(TRUSTSTORE_PASSWORD.toCharArray()) } + } + } + + describe("calculating maxPayloadSizeBytes") { + on("defined routes") { + val highestMaxPayloadSize = 3 + val sink1 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("1") + whenever(it.maxPayloadSizeBytes()).thenReturn(1) + } + val sink2 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("2") + whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize) + } + val config = validatedConfiguration( + streamPublishers = listOf(sink1, sink2) + ) + + val result = cut.toFinalConfiguration(config) + + it("should use the highest value among all routes") { + assertThat(result.collector.maxPayloadSizeBytes) + .isEqualTo(highestMaxPayloadSize) + } + } + + on("empty routing") { + val config = validatedConfiguration( + streamPublishers = emptyList() + ) + + val result = cut.toFinalConfiguration(config) + + it("should use default value") { + assertThat(result.collector.maxPayloadSizeBytes) + .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) + } + } + } + + } +}) + +private fun validatedConfiguration(listenPort: Int = defaultListenPort, + idleTimeoutSec: Long = defaultIdleTimeoutSec, + firstReqDelaySec: Long = defaultFirstReqDelaySec, + requestIntervalSec: Long = defaultRequestIntervalSec, + sslDisable: Option<Boolean> = Some(false), + keyStoreFile: String = KEYSTORE, + keyStorePasswordFile: String = KEYSTORE_PASS_FILE, + trustStoreFile: String = TRUSTSTORE, + trustStorePasswordFile: String = TRUSTSTORE_PASS_FILE, + streamPublishers: List<KafkaSink> = sampleStreamsDefinition, + logLevel: Option<LogLevel> = Some(LogLevel.INFO) +): ValidatedPartialConfiguration = PartialConfiguration( + listenPort = Some(listenPort), + idleTimeoutSec = Some(idleTimeoutSec), + firstRequestDelaySec = Some(firstReqDelaySec), + requestIntervalSec = Some(requestIntervalSec), + streamPublishers = Some(streamPublishers), + sslDisable = sslDisable, + keyStoreFile = Some(keyStoreFile), + keyStorePasswordFile = Some(keyStorePasswordFile), + trustStoreFile = Some(trustStoreFile), + trustStorePasswordFile = Some(trustStorePasswordFile), + logLevel = logLevel +).unsafeAsValidated() + diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 5495c865..26a9cc57 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -22,101 +22,84 @@ package org.onap.dcae.collectors.veshv.config.impl import arrow.core.None import arrow.core.Option import arrow.core.Some -import arrow.core.getOrElse -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import org.assertj.core.api.Assertions.assertThat -import org.assertj.core.api.Assertions.fail +import org.assertj.core.api.Assertions.* import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.config.api.model.Route -import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator.Companion.DEFAULT_LOG_LEVEL +import org.onap.dcae.collectors.veshv.config.api.model.ValidationException import org.onap.dcae.collectors.veshv.utils.logging.LogLevel import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink -import org.onap.dcaegen2.services.sdk.security.ssl.SecurityKeys -import java.io.File -import java.nio.file.Paths -import java.time.Duration internal object ConfigurationValidatorTest : Spek({ describe("ConfigurationValidator") { val cut = ConfigurationValidator() describe("validating partial configuration with missing fields") { - val config = PartialConfiguration( - listenPort = Some(1) - ) - - it("should return ValidationError") { - val result = cut.validate(config) - assertThat(result.isLeft()).isTrue() - } - } + val config = PartialConfiguration(listenPort = Some(5)) - describe("validating configuration with empty log level") { - val config = partialConfiguration( - logLevel = None - ) - - it("should use default log level") { + it("should return ValidationException with missing required fields description") { val result = cut.validate(config) - result.fold( - { - fail("Configuration should have been created successfully") - }, - { - assertThat(it.logLevel).isEqualTo(DEFAULT_LOG_LEVEL) - } - ) + result.fold({ + assertThat(it.message).doesNotContain(PartialConfiguration::listenPort.name) + + assertThat(it.message).contains(PartialConfiguration::idleTimeoutSec.name) + assertThat(it.message).contains(PartialConfiguration::firstRequestDelaySec.name) + assertThat(it.message).contains(PartialConfiguration::requestIntervalSec.name) + assertThat(it.message).contains(PartialConfiguration::streamPublishers.name) + assertThat(it.message).contains(PartialConfiguration::keyStoreFile.name) + assertThat(it.message).contains(PartialConfiguration::keyStorePasswordFile.name) + assertThat(it.message).contains(PartialConfiguration::trustStoreFile.name) + assertThat(it.message).contains(PartialConfiguration::trustStorePasswordFile.name) + + assertThat(it.message).doesNotContain(PartialConfiguration::logLevel.name) + assertThat(it.message).doesNotContain(PartialConfiguration::sslDisable.name) + }, { fail("Should be ValidationException") }) } } - describe("validating complete configuration") { + describe("validating complete valid configuration") { val config = PartialConfiguration( listenPort = Some(defaultListenPort), idleTimeoutSec = Some(defaultIdleTimeoutSec), firstRequestDelaySec = Some(defaultFirstReqDelaySec), requestIntervalSec = Some(defaultRequestIntervalSec), sslDisable = Some(false), - keyStoreFile = Some(keyStore), - keyStorePasswordFile = Some(keyStorePassFile), - trustStoreFile = Some(trustStore), - trustStorePasswordFile = Some(trustStorePassFile), + keyStoreFile = Some(KEYSTORE), + keyStorePasswordFile = Some(KEYSTORE_PASSWORD), + trustStoreFile = Some(TRUSTSTORE), + trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD), streamPublishers = Some(sampleStreamsDefinition), logLevel = Some(LogLevel.TRACE) ) - it("should create valid configuration") { + it("should create validated configuration") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully") }, { - assertThat(it.server.listenPort) + assertThat(it.listenPort) .isEqualTo(defaultListenPort) - assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) - - val securityKeys = it.security.keys - .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys - assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath()) - assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath()) - securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) } - securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) } - - assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) - assertThat(it.cbs.requestInterval) - .isEqualTo(Duration.ofSeconds(defaultRequestIntervalSec)) - - assertThat(it.collector.routing) - .isEqualTo(sampleRouting) - assertThat(it.collector.maxPayloadSizeBytes) - .isEqualTo(sampleMaxPayloadSize) - - assertThat(it.logLevel).isEqualTo(LogLevel.TRACE) + assertThat(it.idleTimeoutSec) + .isEqualTo(defaultIdleTimeoutSec) + + it.securityConfiguration.fold({ + fail("Should have been validated successfully") + }, { + assertThat(it.keyStoreFile).isEqualTo(KEYSTORE) + assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD) + assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE) + assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD) + }) + + assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) + + assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition) + + assertThat(it.logLevel).isEqualTo(Some(LogLevel.TRACE)) } ) } @@ -126,29 +109,26 @@ internal object ConfigurationValidatorTest : Spek({ val config = partialConfiguration( sslDisable = Some(true), keyStoreFile = Some(""), - keyStorePassword = Some(""), - trustStoreFile = Some(""), - trustStorePassword = Some("") + keyStorePasswordFile = None, + trustStoreFile = None, + trustStorePasswordFile = Some("") ) - it("should create valid configuration") { + it("should return validated configuration regardless of security keys presence") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully but was $it") }, { - assertThat(it.server.idleTimeout) - .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) + assertThat(it.idleTimeoutSec).isEqualTo(defaultIdleTimeoutSec) - assertThat(it.security.keys) - .isEqualTo(None) + assertThat(it.securityConfiguration.isEmpty()).isTrue() - assertThat(it.cbs.firstRequestDelay) - .isEqualTo(Duration.ofSeconds(defaultFirstReqDelaySec)) + assertThat(it.cbsConfiguration.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(it.cbsConfiguration.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) - assertThat(it.collector.routing) - .isEqualTo(sampleRouting) + assertThat(it.streamPublishers).isEqualTo(sampleStreamsDefinition) } ) } @@ -159,24 +139,81 @@ internal object ConfigurationValidatorTest : Spek({ sslDisable = None ) - it("should create valid configuration with ssl enabled") { + it("should return validated configuration") { val result = cut.validate(config) result.fold( { fail("Configuration should have been created successfully but was $it") }, { - val securityKeys = it.security.keys - .getOrElse { fail("Should be immutableSecurityKeys") } as SecurityKeys - assertThat(securityKeys.keyStore().path()).isEqualTo(File(keyStore).toPath()) - assertThat(securityKeys.trustStore().path()).isEqualTo(File(trustStore).toPath()) - securityKeys.keyStorePassword().use { assertThat(it).isEqualTo(keyStorePass.toCharArray()) } - securityKeys.trustStorePassword().use { assertThat(it).isEqualTo(trustStorePass.toCharArray()) } + it.securityConfiguration.fold({ + fail("Should have been validated successfully") + }, { + assertThat(it.keyStoreFile).isEqualTo(KEYSTORE) + assertThat(it.keyStorePasswordFile).isEqualTo(KEYSTORE_PASSWORD) + assertThat(it.trustStoreFile).isEqualTo(TRUSTSTORE) + assertThat(it.trustStorePasswordFile).isEqualTo(TRUSTSTORE_PASSWORD) + }) + } ) } } + describe("validating configuration with ssl enabled, but not all required security fields set") { + val config = partialConfiguration( + sslDisable = Some(false), + keyStoreFile = Some(KEYSTORE), + keyStorePasswordFile = None, + trustStoreFile = None, + trustStorePasswordFile = Some(TRUSTSTORE_PASSWORD) + ) + + it("should return validated configuration") { + val result = cut.validate(config) + + assertThat(result.isLeft()) + .describedAs("security validation result") + .isTrue() + } + } + + describe("validating CBS configuration from partial") { + given("valid CBS configuration") { + val config = partialConfiguration() + + it("should returned validated config") { + val result = cut.validatedCbsConfiguration(config) + + assertThat(result.firstRequestDelaySec).isEqualTo(defaultFirstReqDelaySec) + assertThat(result.requestIntervalSec).isEqualTo(defaultRequestIntervalSec) + } + + } + + given("missing firstReqDelaySec") { + val config = partialConfiguration( + firstReqDelaySec = None + ) + + it("should throw validation exception") { + assertThatExceptionOfType(ValidationException::class.java).isThrownBy { + cut.validatedCbsConfiguration(config) + }.withMessageContaining(PartialConfiguration::firstRequestDelaySec.name) + } + } + + given("missing requestIntervalSec") { + val config = partialConfiguration( + requestIntervalSec = None) + + it("should throw validation exception") { + assertThatExceptionOfType(ValidationException::class.java).isThrownBy { + cut.validatedCbsConfiguration(config) + }.withMessageContaining(PartialConfiguration::requestIntervalSec.name) + } + } + } } }) @@ -185,10 +222,10 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec), requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec), sslDisable: Option<Boolean> = Some(false), - keyStoreFile: Option<String> = Some(keyStore), - keyStorePassword: Option<String> = Some(keyStorePassFile), - trustStoreFile: Option<String> = Some(trustStore), - trustStorePassword: Option<String> = Some(trustStorePassFile), + keyStoreFile: Option<String> = Some(KEYSTORE), + keyStorePasswordFile: Option<String> = Some(KEYSTORE_PASSWORD), + trustStoreFile: Option<String> = Some(TRUSTSTORE), + trustStorePasswordFile: Option<String> = Some(TRUSTSTORE_PASSWORD), streamPublishers: Option<List<KafkaSink>> = Some(sampleStreamsDefinition), logLevel: Option<LogLevel> = Some(LogLevel.INFO) ) = PartialConfiguration( @@ -198,35 +235,9 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor requestIntervalSec = requestIntervalSec, sslDisable = sslDisable, keyStoreFile = keyStoreFile, - keyStorePasswordFile = keyStorePassword, + keyStorePasswordFile = keyStorePasswordFile, trustStoreFile = trustStoreFile, - trustStorePasswordFile = trustStorePassword, + trustStorePasswordFile = trustStorePasswordFile, streamPublishers = streamPublishers, logLevel = logLevel ) - -private fun resourcePathAsString(resource: String) = - Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString() - -private const val defaultListenPort = 1234 -private const val defaultRequestIntervalSec = 3L -private const val defaultIdleTimeoutSec = 10L -private const val defaultFirstReqDelaySec = 10L - -private const val keyStore = "test.ks.pkcs12" -private const val trustStore = "trust.ks.pkcs12" -private const val keyStorePass = "change.me" -private const val trustStorePass = "change.me.too" -private val keyStorePassFile = resourcePathAsString("/test.ks.pass") -private val trustStorePassFile = resourcePathAsString("/trust.ks.pass") - -private const val sampleSinkName = "perf3gpp" -const val sampleMaxPayloadSize = 1024 - -private val sink = mock<KafkaSink>().also { - whenever(it.name()).thenReturn(sampleSinkName) - whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize) -} - -private val sampleStreamsDefinition = listOf(sink) -private val sampleRouting = listOf(Route(sink.name(), sink)) diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt new file mode 100644 index 00000000..f07af079 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/test_constants.kt @@ -0,0 +1,54 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcae.collectors.veshv.utils.logging.LogLevel +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink +import java.nio.file.Paths + +private fun resourcePathAsString(resource: String) = + Paths.get(ConfigurationValidatorTest::class.java.getResource(resource).toURI()).toString() + +internal val DEFAULT_LOG_LEVEL = LogLevel.INFO + +internal const val defaultListenPort = 1234 +internal const val defaultRequestIntervalSec = 3L +internal const val defaultIdleTimeoutSec = 10L +internal const val defaultFirstReqDelaySec = 10L + +internal const val KEYSTORE = "test.ks.pkcs12" +internal const val KEYSTORE_PASSWORD = "change.me" +internal const val TRUSTSTORE = "trust.ks.pkcs12" +internal const val TRUSTSTORE_PASSWORD = "change.me.too" +internal val KEYSTORE_PASS_FILE = resourcePathAsString("/test.ks.pass") +internal val TRUSTSTORE_PASS_FILE = resourcePathAsString("/trust.ks.pass") + +internal const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + +private val sampleSink = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("perf3gpp") + whenever(it.maxPayloadSizeBytes()).thenReturn(DEFAULT_MAX_PAYLOAD_SIZE_BYTES) +} + +internal val sampleStreamsDefinition = listOf(sampleSink) +internal val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index e1e35d8b..122de173 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -59,31 +59,25 @@ <dependencies> <dependency> - <groupId>${project.parent.groupId}</groupId> + <groupId>${project.groupId}</groupId> <artifactId>hv-collector-configuration</artifactId> - <version>${project.parent.version}</version> + <version>${project.version}</version> </dependency> <dependency> - <groupId>${project.parent.groupId}</groupId> + <groupId>${project.groupId}</groupId> <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> + <version>${project.version}</version> <scope>compile</scope> </dependency> <dependency> - <groupId>${project.parent.groupId}</groupId> + <groupId>${project.groupId}</groupId> <artifactId>hv-collector-utils</artifactId> - <version>${project.parent.version}</version> + <version>${project.version}</version> </dependency> <dependency> - <groupId>${project.parent.groupId}</groupId> + <groupId>${project.groupId}</groupId> <artifactId>hv-collector-ssl</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-test-utils</artifactId> - <version>${project.parent.version}</version> - <scope>test</scope> + <version>${project.version}</version> </dependency> <dependency> <groupId>io.arrow-kt</groupId> @@ -105,6 +99,12 @@ <groupId>io.projectreactor.kafka</groupId> <artifactId>reactor-kafka</artifactId> </dependency> + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>hv-collector-test-utils</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 48f335a1..28b28203 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt index 4c54d7d2..23a5d376 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/api.kt @@ -20,9 +20,8 @@ package org.onap.dcae.collectors.veshv.boundary import io.netty.buffer.ByteBuf -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.utils.Closeable -import org.onap.dcae.collectors.veshv.utils.ServerHandle import reactor.core.publisher.Flux import reactor.core.publisher.Mono @@ -34,6 +33,3 @@ interface CollectorFactory : Closeable { operator fun invoke(ctx: ClientContext): Collector } -interface Server { - fun start(): Mono<ServerHandle> -} diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt index c3c5d733..1f221c60 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -29,7 +29,7 @@ import org.onap.dcae.collectors.veshv.impl.HvVesCollector import org.onap.dcae.collectors.veshv.impl.Router import org.onap.dcae.collectors.veshv.impl.VesDecoder import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index 7d8f0cb1..ac7c3917 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -23,9 +23,9 @@ import io.netty.buffer.ByteBuf import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError import org.onap.dcae.collectors.veshv.impl.wire.WireChunkDecoder -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index fe34a9c7..2190eba3 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -26,7 +26,7 @@ import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.ConsumedMessage @@ -43,12 +43,8 @@ class Router internal constructor(private val routing: Routing, sinkFactory: SinkFactory, ctx: ClientContext, metrics: Metrics) : - this(routing, - constructMessageSinks(routing, sinkFactory, ctx), - ctx, - metrics) { - logger.debug(ctx::mdc) { "Routing for client: $routing" } - logger.trace(ctx::mdc) { "Message sinks configured for client: $messageSinks" } + this(routing, constructMessageSinks(routing, sinkFactory, ctx), ctx, metrics) { + logger.debug(ctx::mdc) { "Routing configuration for client: $routing" } } fun route(message: VesMessage): Flux<ConsumedMessage> = diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt index 8d154091..8f66de2b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/HttpAdapter.kt @@ -21,7 +21,7 @@ package org.onap.dcae.collectors.veshv.impl.adapters import io.netty.handler.codec.http.HttpStatusClass import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc +import org.onap.dcae.collectors.veshv.domain.logging.OnapMdc import reactor.core.publisher.Mono import reactor.netty.http.client.HttpClient import java.util.* diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt index 7b726ab4..91e6fde5 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaPublisher.kt @@ -22,14 +22,16 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.withDebug -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.withDebug +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.trace +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux import reactor.kafka.sender.KafkaSender diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt index 9df1af31..2973fa8d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/kafka/KafkaSinkFactory.kt @@ -22,8 +22,8 @@ package org.onap.dcae.collectors.veshv.impl.adapters.kafka import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.impl.createKafkaSender -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.model.streams.SinkStream import org.onap.ves.VesEventOuterClass.CommonEventHeader diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt index ca9d28ae..0d0f8ea7 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoder.kt @@ -25,9 +25,9 @@ import org.onap.dcae.collectors.veshv.domain.MissingWireFrameBytes import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecodingError import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.handleReactiveStreamError -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.trace -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.handleReactiveStreamError +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.trace +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Flux import reactor.core.publisher.Flux.defer diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 6b9c6803..533581d5 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -36,7 +36,7 @@ import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage import org.onap.dcae.collectors.veshv.tests.utils.commonHeader diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index e0092cf9..10dea82d 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -30,7 +30,7 @@ import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import reactor.test.test /** diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 3002f334..78d2e704 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -57,7 +57,7 @@ object PerformanceSpecification : Spek({ describe("VES High Volume Collector performance") { it("should handle multiple clients in reasonable time") { val sink = CountingSink() - val sut = Sut(CollectorConfiguration(basicRouting), sink) + val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink) val numMessages: Long = 300_000 val runs = 4 @@ -87,7 +87,7 @@ object PerformanceSpecification : Spek({ it("should disconnect on transmission errors") { val sink = CountingSink() - val sut = Sut(CollectorConfiguration(basicRouting), sink) + val sut = Sut(CollectorConfiguration(basicRouting, MAX_PAYLOAD_SIZE_BYTES), sink) val numMessages: Long = 100_000 val timeout = Duration.ofSeconds(30) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 88d1567e..93c71e5e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -29,11 +29,12 @@ import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.domain.RoutedMessage import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysFailingSink import org.onap.dcae.collectors.veshv.tests.fakes.AlwaysSuccessfulSink import org.onap.dcae.collectors.veshv.tests.fakes.DelayingSink import org.onap.dcae.collectors.veshv.tests.fakes.FakeMetrics +import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.utils.Closeable @@ -95,10 +96,10 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory { } fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysSuccessfulSink()) fun vesHvWithAlwaysFailingSink(routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), AlwaysFailingSink()) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), AlwaysFailingSink()) fun vesHvWithDelayingSink(delay: Duration, routing: Routing = basicRouting): Sut = - Sut(CollectorConfiguration(routing), DelayingSink(delay)) + Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), DelayingSink(delay)) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index f90f4bc9..e74e1f62 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -215,6 +215,6 @@ object VesHvSpecification : Spek({ private fun vesHvWithStoringSink(routing: Routing = basicRouting): Pair<Sut, StoringSink> { val sink = StoringSink() - val sut = Sut(CollectorConfiguration(routing), sink) + val sut = Sut(CollectorConfiguration(routing, MAX_PAYLOAD_SIZE_BYTES), sink) return Pair(sut, sink) } diff --git a/sources/hv-collector-domain/pom.xml b/sources/hv-collector-domain/pom.xml index 40e7c936..d68fa65b 100644 --- a/sources/hv-collector-domain/pom.xml +++ b/sources/hv-collector-domain/pom.xml @@ -58,6 +58,11 @@ <dependencies> <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.onap.dcaegen2.services.sdk</groupId> <artifactId>hvvesclient-protobuf</artifactId> </dependency> @@ -66,6 +71,10 @@ <artifactId>kotlin-stdlib-jdk8</artifactId> </dependency> <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> <groupId>io.projectreactor.netty</groupId> <artifactId>reactor-netty</artifactId> </dependency> @@ -80,7 +89,6 @@ <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-test</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.jetbrains.spek</groupId> @@ -90,6 +98,10 @@ <groupId>org.jetbrains.spek</groupId> <artifactId>spek-junit-platform-engine</artifactId> </dependency> + <dependency> + <groupId>com.nhaarman.mockitokotlin2</groupId> + <artifactId>mockito-kotlin</artifactId> + </dependency> </dependencies> </project> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt index 7b082e64..6a47f44d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ClientContext.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContext.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,13 +17,12 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.model +package org.onap.dcae.collectors.veshv.domain.logging import arrow.core.None import arrow.core.Option import arrow.core.getOrElse import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc import java.net.InetAddress import java.security.cert.X509Certificate import java.util.* diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt index 954de978..fc45ea9d 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ClientContextLogging.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextLogging.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,16 +17,15 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.adapters +package org.onap.dcae.collectors.veshv.domain.logging -import org.onap.dcae.collectors.veshv.model.ClientContext import org.onap.dcae.collectors.veshv.utils.logging.AtLevelLogger import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.logging.handleReactiveStreamError import reactor.core.publisher.Flux @Suppress("TooManyFunctions") -internal object ClientContextLogging { +object ClientContextLogging { fun Logger.withError(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withError(ctx::fullMdc, block) fun Logger.withWarn(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withWarn(ctx::fullMdc, block) fun Logger.withInfo(ctx: ClientContext, block: AtLevelLogger.() -> Unit) = withInfo(ctx::fullMdc, block) diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt index ac39100d..b9463c96 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Marker.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/Marker.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.logging +package org.onap.dcae.collectors.veshv.domain.logging import org.slf4j.MarkerFactory import java.time.Instant diff --git a/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt new file mode 100644 index 00000000..2959f98c --- /dev/null +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/MarkerLogging.kt @@ -0,0 +1,63 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.domain.logging + +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext +import org.slf4j.MDC + + +@Suppress("TooManyFunctions") +object MarkerLogging { + fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + withError(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } } + + fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = + withWarn(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message(), t) } } + + fun Logger.info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withInfo(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withDebug(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + fun Logger.trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = + withTrace(mdc) { withAdditionalMdc(marker.mdc) { log(marker.slf4jMarker, message()) } } + + + private inline fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) { + if (mdc.isEmpty()) { + block() + } else { + try { + mdc.forEach(MDC::put) + block() + } finally { + mdc.keys.forEach(MDC::remove) + } + } + } +}
\ No newline at end of file diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt index 86584164..8c7feced 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/OnapMdc.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/OnapMdc.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.utils.logging +package org.onap.dcae.collectors.veshv.domain.logging /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt index a72ec034..c3c64d92 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContext.kt +++ b/sources/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContext.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,9 +17,8 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.model +package org.onap.dcae.collectors.veshv.domain.logging -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc import java.net.InetAddress import java.net.UnknownHostException import java.util.* diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt index a49428a7..ea1a2e90 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ClientContextTest.kt +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ClientContextTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.model +package org.onap.dcae.collectors.veshv.domain.logging import arrow.core.Some import com.nhaarman.mockitokotlin2.mock @@ -28,12 +28,8 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc -import java.net.Inet4Address import java.net.InetAddress -import java.net.InetSocketAddress import java.security.cert.X509Certificate -import java.util.* import javax.security.auth.x500.X500Principal /** diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt index 5b6e4526..85ced42a 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/model/ServiceContextTest.kt +++ b/sources/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/logging/ServiceContextTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.model +package org.onap.dcae.collectors.veshv.domain.logging import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek @@ -25,7 +25,6 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.utils.logging.OnapMdc import java.util.* /** diff --git a/sources/hv-collector-domain/src/test/resources/logback-test.xml b/sources/hv-collector-domain/src/test/resources/logback-test.xml new file mode 100644 index 00000000..9a4eacfe --- /dev/null +++ b/sources/hv-collector-domain/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ +<?xml version="1.0" encoding="UTF-8"?> +<configuration> + <property name="LOG_FILE" + value="${LOG_FILE:-${LOG_PATH:-${LOG_TEMP:-${java.io.tmpdir:-/tmp}}/}ves-hv.log}"/> + <property name="FILE_LOG_PATTERN" value="%d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %-5level [%-40.40logger{10}] - %msg%n"/> + + <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern> + %d{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC} %highlight(%-5level) [%-40.40logger{10}] - %msg%n + </pattern> + </encoder> + </appender> + + <appender name="ROLLING-FILE" + class="ch.qos.logback.core.rolling.RollingFileAppender"> + <encoder> + <pattern>${FILE_LOG_PATTERN}</pattern> + </encoder> + <file>${LOG_FILE}</file> + <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> + <fileNamePattern>${LOG_FILE}.%d{yyyy-MM-dd}.log</fileNamePattern> + <maxFileSize>50MB</maxFileSize> + <maxHistory>30</maxHistory> + <totalSizeCap>10GB</totalSizeCap> + </rollingPolicy> + </appender> + + <logger name="org.onap.dcae.collectors.veshv" level="TRACE"/> + + <root level="INFO"> + <appender-ref ref="CONSOLE"/> + <appender-ref ref="ROLLING-FILE"/> + </root> +</configuration> diff --git a/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 00000000..ca6ee9ce --- /dev/null +++ b/sources/hv-collector-domain/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline
\ No newline at end of file diff --git a/sources/hv-collector-main/pom.xml b/sources/hv-collector-main/pom.xml index 3fe8932f..d99bf855 100644 --- a/sources/hv-collector-main/pom.xml +++ b/sources/hv-collector-main/pom.xml @@ -83,12 +83,12 @@ </dependency> <dependency> <groupId>${project.groupId}</groupId> - <artifactId>hv-collector-core</artifactId> + <artifactId>hv-collector-health-check</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>${project.groupId}</groupId> - <artifactId>hv-collector-health-check</artifactId> + <artifactId>hv-collector-server</artifactId> <version>${project.version}</version> </dependency> <dependency> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index dfb388d8..123d2dc9 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -19,14 +19,16 @@ */ package org.onap.dcae.collectors.veshv.main +import org.onap.dcae.collectors.veshv.api.ServersFactory import org.onap.dcae.collectors.veshv.config.api.ConfigurationModule import org.onap.dcae.collectors.veshv.config.api.ConfigurationStateListener import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState +import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.main.servers.HealthCheckServer -import org.onap.dcae.collectors.veshv.main.servers.VesServer -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcae.collectors.veshv.utils.registerShutdownHook @@ -41,6 +43,7 @@ private val logger = Logger("$VES_HV_PACKAGE.main") private val hvVesServer = AtomicReference<ServerHandle>() private val configurationModule = ConfigurationModule() +private val sslContextFactory = SslContextFactory() private val maxCloseTime = Duration.ofSeconds(10) fun main(args: Array<String>) { @@ -81,7 +84,11 @@ private fun startServer(config: HvVesConfiguration): Mono<ServerHandle> = private fun deferredVesServer(config: HvVesConfiguration) = Mono.defer { Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel) logger.debug(ServiceContext::mdc) { "Configuration: $config" } - VesServer.start(config) + ServersFactory.createHvVesServer( + config, + sslContextFactory, + MicrometerMetrics.INSTANCE + ).start() } private fun stopRunningServer() = Mono.defer { diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt index c970e5c8..2ed6ea70 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/HealthCheckServer.kt @@ -22,7 +22,7 @@ package org.onap.dcae.collectors.veshv.main.servers import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.healthcheck.factory.HealthCheckApiServer import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import java.net.InetSocketAddress diff --git a/sources/hv-collector-server/pom.xml b/sources/hv-collector-server/pom.xml new file mode 100644 index 00000000..b8743450 --- /dev/null +++ b/sources/hv-collector-server/pom.xml @@ -0,0 +1,90 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ ============LICENSE_START======================================================= + ~ dcaegen2-collectors-veshv + ~ ================================================================================ + ~ Copyright (C) 2019 NOKIA + ~ ================================================================================ + ~ Licensed under the Apache License, Version 2.0 (the "License"); + ~ you may not use this file except in compliance with the License. + ~ You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ ============LICENSE_END========================================================= + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <licenses> + <license> + <name>The Apache Software License, Version 2.0</name> + <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> + </license> + </licenses> + + <parent> + <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> + <artifactId>hv-collector-sources</artifactId> + <version>1.1.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hv-collector-server</artifactId> + <description>VES HighVolume Collector :: Server</description> + + <build> + <plugins> + <plugin> + <artifactId>kotlin-maven-plugin</artifactId> + <groupId>org.jetbrains.kotlin</groupId> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <groupId>org.apache.maven.plugins</groupId> + </plugin> + <plugin> + <groupId>org.jacoco</groupId> + <artifactId>jacoco-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-core</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-ssl</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>${project.parent.groupId}</groupId> + <artifactId>hv-collector-utils</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>io.projectreactor</groupId> + <artifactId>reactor-core</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.addons</groupId> + <artifactId>reactor-extra</artifactId> + </dependency> + <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + </dependency> + </dependencies> + +</project> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt index e0f611b6..2bfac8d8 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/ServerFactory.kt +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/api/Server.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,32 +17,25 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.factory +package org.onap.dcae.collectors.veshv.api -import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Server -import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.impl.socket.NettyTcpServer -import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration +import org.onap.dcae.collectors.veshv.impl.HvVesServer import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.utils.ServerHandle +import reactor.core.publisher.Mono + +interface Server { + fun start(): Mono<ServerHandle> +} /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since May 2018 */ -object ServerFactory { - - private val sslFactory = SslContextFactory() - - fun createNettyTcpServer(serverConfig: ServerConfiguration, - securityConfig: SecurityConfiguration, - collectorFactory: CollectorFactory, - metrics: Metrics - ): Server = NettyTcpServer( - serverConfig, - sslFactory.createServerContext(securityConfig), - collectorFactory, - metrics - ) +object ServersFactory { + fun createHvVesServer(config: HvVesConfiguration, + sslContextFactory: SslContextFactory, + metrics: Metrics): Server = HvVesServer(config, sslContextFactory, metrics) } diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt index 98a094b2..0e149ab7 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesServer.kt @@ -17,15 +17,15 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.main.servers +package org.onap.dcae.collectors.veshv.impl -import org.onap.dcae.collectors.veshv.boundary.Server +import org.onap.dcae.collectors.veshv.api.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.HvVesCollectorFactory -import org.onap.dcae.collectors.veshv.factory.ServerFactory import org.onap.dcae.collectors.veshv.factory.AdapterFactory -import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.boundary.Metrics +import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger import reactor.core.publisher.Mono @@ -34,36 +34,34 @@ import reactor.core.publisher.Mono * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since August 2018 */ -object VesServer { +internal class HvVesServer(private val config: HvVesConfiguration, + private val sslFactory: SslContextFactory, + private val metrics: Metrics) : Server { - private val logger = Logger(VesServer::class) + private val logger = Logger(HvVesServer::class) - fun start(config: HvVesConfiguration): Mono<ServerHandle> = - createVesServer(config) + override fun start(): Mono<ServerHandle> = + createNettyTcpServer(config) .start() .doOnNext(::logServerStarted) - private fun createVesServer(config: HvVesConfiguration): Server = - createCollectorProvider(config) - .let { collectorProvider -> - ServerFactory.createNettyTcpServer( - config.server, - config.security, - collectorProvider, - MicrometerMetrics.INSTANCE - ) - } + private fun createNettyTcpServer(config: HvVesConfiguration): Server = + NettyTcpServer( + config.server, + sslFactory.createServerContext(config.security), + createCollectorProvider(config), + metrics + ) private fun createCollectorProvider(config: HvVesConfiguration): HvVesCollectorFactory = HvVesCollectorFactory( config.collector, AdapterFactory.sinkCreatorFactory(), - MicrometerMetrics.INSTANCE + metrics ) private fun logServerStarted(handle: ServerHandle) = logger.info(ServiceContext::mdc) { "HighVolume VES Collector is up and listening on ${handle.host}:${handle.port}" } - } diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt index 7ce86f98..d19b7f49 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/NettyTcpServer.kt +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/NettyTcpServer.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,24 +17,27 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.socket +package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import arrow.core.getOrElse import io.netty.handler.ssl.SslContext +import org.onap.dcae.collectors.veshv.api.Server import org.onap.dcae.collectors.veshv.boundary.Collector import org.onap.dcae.collectors.veshv.boundary.CollectorFactory import org.onap.dcae.collectors.veshv.boundary.Metrics -import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug -import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.info -import org.onap.dcae.collectors.veshv.model.ClientContext -import org.onap.dcae.collectors.veshv.model.ServiceContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ServiceContext import org.onap.dcae.collectors.veshv.utils.NettyServerHandle import org.onap.dcae.collectors.veshv.utils.ServerHandle import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.onap.dcae.collectors.veshv.utils.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.Marker +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.debug +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.info +import org.onap.dcae.collectors.veshv.domain.logging.MarkerLogging.warn +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.debug +import org.onap.dcae.collectors.veshv.domain.logging.ClientContextLogging.info import reactor.core.publisher.Mono import reactor.netty.Connection import reactor.netty.NettyInbound diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt index a1e5b8fd..eb51cf4b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/socket/networking.kt +++ b/sources/hv-collector-server/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/networking.kt @@ -17,14 +17,14 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.impl.socket +package org.onap.dcae.collectors.veshv.impl import arrow.core.Option import arrow.core.Try import arrow.syntax.collections.firstOption import io.netty.handler.ssl.SslHandler import io.netty.util.concurrent.Future -import org.onap.dcae.collectors.veshv.model.ClientContext +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import reactor.core.publisher.Mono import reactor.netty.ByteBufFlux import reactor.netty.Connection diff --git a/sources/hv-collector-ssl/pom.xml b/sources/hv-collector-ssl/pom.xml index 0ba609e5..2aaf2295 100644 --- a/sources/hv-collector-ssl/pom.xml +++ b/sources/hv-collector-ssl/pom.xml @@ -56,16 +56,6 @@ <dependencies> <dependency> <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-commandline</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> - <groupId>${project.parent.groupId}</groupId> <artifactId>hv-collector-test-utils</artifactId> <version>${project.parent.version}</version> <scope>test</scope> @@ -94,6 +84,4 @@ <scope>test</scope> </dependency> </dependencies> - - </project> diff --git a/sources/hv-collector-utils/pom.xml b/sources/hv-collector-utils/pom.xml index 9dc8c9af..2e13e0a7 100644 --- a/sources/hv-collector-utils/pom.xml +++ b/sources/hv-collector-utils/pom.xml @@ -56,11 +56,6 @@ <dependencies> <dependency> - <groupId>${project.parent.groupId}</groupId> - <artifactId>hv-collector-domain</artifactId> - <version>${project.parent.version}</version> - </dependency> - <dependency> <groupId>org.jetbrains.kotlin</groupId> <artifactId>kotlin-reflect</artifactId> </dependency> @@ -81,6 +76,11 @@ <artifactId>arrow-syntax</artifactId> </dependency> <dependency> + <groupId>io.projectreactor.netty</groupId> + <artifactId>reactor-netty</artifactId> + <optional>true</optional> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <optional>true</optional> diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt index cfed7f32..ceae62db 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt @@ -17,6 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ + +@file:Suppress("TooManyFunctions") + package org.onap.dcae.collectors.veshv.utils.arrow import arrow.core.Either @@ -37,6 +40,7 @@ import java.util.concurrent.atomic.AtomicReference * @since July 2018 */ + object OptionUtils { fun <A> binding(c: suspend MonadContinuation<ForOption, *>.() -> A) : Option<A> = Option.monad().binding(c).fix() @@ -78,6 +82,17 @@ fun <A> Try<A>.doOnFailure(action: (Throwable) -> Unit): Try<A> = apply { fun <A, B> A.mapBinding(c: suspend MonadContinuation<ForOption, *>.(A) -> B) : Option<B> = let { OptionUtils.binding { c(it) } } +fun <T> Option<Boolean>.flatFold(ifEmptyOrFalse: () -> T, ifTrue: () -> T) = + fold({ + ifEmptyOrFalse() + }, { + if (it) { + ifTrue() + } else { + ifEmptyOrFalse() + } + }) + diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt index 7fcc73a0..14bc3ec0 100644 --- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt +++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/logging/Logger.kt @@ -23,6 +23,7 @@ import ch.qos.logback.classic.LoggerContext import kotlin.reflect.KClass import org.slf4j.LoggerFactory import org.slf4j.MDC +import org.slf4j.Marker typealias MappedDiagnosticContext = () -> Map<String, String> @@ -52,91 +53,70 @@ class Logger(logger: org.slf4j.Logger) { fun withError(block: AtLevelLogger.() -> Unit) = errorLogger.block() fun withError(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - errorLogger.withMdc(mdc, block) + errorLogger.withMdc(mdc, block) fun error(message: () -> String) = errorLogger.run { log(message()) } fun error(mdc: MappedDiagnosticContext, message: () -> String) = - errorLogger.withMdc(mdc) { log(message()) } - - fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - errorLogger.withMdc(mdc) { log(marker, message()) } - - fun error(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = - errorLogger.withMdc(mdc) { log(marker, message(), t) } + errorLogger.withMdc(mdc) { log(message()) } // WARN fun withWarn(block: AtLevelLogger.() -> Unit) = warnLogger.block() fun withWarn(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - warnLogger.withMdc(mdc, block) + warnLogger.withMdc(mdc, block) fun warn(message: () -> String) = warnLogger.run { log(message()) } fun warn(mdc: MappedDiagnosticContext, message: () -> String) = - warnLogger.withMdc(mdc) { log(message()) } - - fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - warnLogger.withMdc(mdc) { log(marker, message()) } - - fun warn(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String, t: Throwable) = - warnLogger.withMdc(mdc) { log(marker, message(), t) } + warnLogger.withMdc(mdc) { log(message()) } // INFO fun withInfo(block: AtLevelLogger.() -> Unit) = infoLogger.block() fun withInfo(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - infoLogger.withMdc(mdc, block) + infoLogger.withMdc(mdc, block) fun info(message: () -> String) = infoLogger.run { log(message()) } fun info(mdc: MappedDiagnosticContext, message: () -> String) = - infoLogger.withMdc(mdc) { log(message()) } - - fun info(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - infoLogger.withMdc(mdc) { log(marker, message()) } + infoLogger.withMdc(mdc) { log(message()) } // DEBUG fun withDebug(block: AtLevelLogger.() -> Unit) = debugLogger.block() fun withDebug(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - debugLogger.withMdc(mdc, block) + debugLogger.withMdc(mdc, block) fun debug(message: () -> String) = debugLogger.run { log(message()) } fun debug(mdc: MappedDiagnosticContext, message: () -> String) = - debugLogger.withMdc(mdc) { log(message()) } - - fun debug(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - debugLogger.withMdc(mdc) { log(marker, message()) } + debugLogger.withMdc(mdc) { log(message()) } // TRACE fun withTrace(block: AtLevelLogger.() -> Unit) = traceLogger.block() fun withTrace(mdc: MappedDiagnosticContext, block: AtLevelLogger.() -> Unit) = - traceLogger.withMdc(mdc, block) + traceLogger.withMdc(mdc, block) fun trace(message: () -> String) = traceLogger.run { log(message()) } fun trace(mdc: MappedDiagnosticContext, message: () -> String) = - traceLogger.withMdc(mdc) { log(message()) } - - fun trace(mdc: MappedDiagnosticContext, marker: Marker, message: () -> String) = - traceLogger.withMdc(mdc) { log(marker, message()) } + traceLogger.withMdc(mdc) { log(message()) } companion object { fun setLogLevel(packageName: String, level: LogLevel) { @@ -165,19 +145,6 @@ abstract class AtLevelLogger { } } } - - protected fun withAdditionalMdc(mdc: Map<String, String>, block: () -> Unit) { - if (mdc.isEmpty()) { - block() - } else { - try { - mdc.forEach(MDC::put) - block() - } finally { - mdc.keys.forEach(MDC::remove) - } - } - } } object OffLevelLogger : AtLevelLogger() { @@ -211,14 +178,10 @@ class ErrorLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.error(marker.slf4jMarker, message) - } + logger.error(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.error(marker.slf4jMarker, message, t) - } + logger.error(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -232,14 +195,10 @@ class WarnLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.warn(marker.slf4jMarker, message) - } + logger.warn(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.warn(marker.slf4jMarker, message, t) - } + logger.warn(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -253,14 +212,10 @@ class InfoLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.info(marker.slf4jMarker, message) - } + logger.info(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.info(marker.slf4jMarker, message, t) - } + logger.info(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -274,14 +229,10 @@ class DebugLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.debug(marker.slf4jMarker, message) - } + logger.debug(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.debug(marker.slf4jMarker, message, t) - } + logger.debug(marker, message, t) } @Suppress("SuboptimalLoggerUsage") @@ -295,12 +246,8 @@ class TraceLevelLogger(private val logger: org.slf4j.Logger) : AtLevelLogger() { } override fun log(marker: Marker, message: String) = - withAdditionalMdc(marker.mdc) { - logger.trace(marker.slf4jMarker, message) - } + logger.trace(marker, message) override fun log(marker: Marker, message: String, t: Throwable) = - withAdditionalMdc(marker.mdc) { - logger.trace(marker.slf4jMarker, message, t) - } + logger.trace(marker, message, t) } diff --git a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt index 9f20bd29..3eeb6340 100644 --- a/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt +++ b/sources/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/generators/RawMessageGenerator.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.ves.message.generator.generators +import com.google.protobuf.ByteString import io.netty.buffer.Unpooled import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameType @@ -45,10 +46,15 @@ class RawMessageGenerator : MessageGenerator<WireFrameParameters, ByteBuffer>() private fun createMessage(messageType: WireFrameType): ByteBuffer = when (messageType) { - INVALID_WIRE_FRAME -> wrap(VesEvent.getDefaultInstance().toByteArray()) + INVALID_WIRE_FRAME -> wrap(constructSampleVesEvent().toByteArray()) INVALID_GPB_DATA -> wrap("invalid vesEvent".toByteArray(Charset.defaultCharset())) } + private fun constructSampleVesEvent() = + VesEvent.newBuilder() + .setEventFields(ByteString.copyFromUtf8("irrelevant")) + .build() + private fun wrap(bytes: ByteArray) = Unpooled.wrappedBuffer(bytes).nioBuffer() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 7fa23f7f..d9cbbaa8 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -36,7 +36,6 @@ import org.onap.dcae.collectors.veshv.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.commandline.intValue import org.onap.dcae.collectors.veshv.commandline.stringValue import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.logging.Logger diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SslUtils.kt index 5981d9d4..7678fdbb 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SslUtils.kt @@ -17,7 +17,8 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ssl.boundary +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + import arrow.core.None import arrow.core.Some @@ -26,6 +27,7 @@ import org.apache.commons.cli.CommandLine import org.onap.dcae.collectors.veshv.commandline.CommandLineOption import org.onap.dcae.collectors.veshv.commandline.hasOption import org.onap.dcae.collectors.veshv.commandline.stringValue +import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeys import org.onap.dcaegen2.services.sdk.security.ssl.ImmutableSecurityKeysStore import org.onap.dcaegen2.services.sdk.security.ssl.Passwords @@ -41,9 +43,6 @@ const val KEY_STORE_PASSWORD_FILE = "/etc/ves-hv/server.pass" const val TRUST_STORE_FILE = "/etc/ves-hv/trust.p12" const val TRUST_STORE_PASSWORD_FILE = "/etc/ves-hv/trust.pass" -fun createSecurityConfiguration(cmdLine: CommandLine): Try<SecurityConfiguration> = - createSecurityConfigurationProvider(cmdLine).map { it() } - fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> SecurityConfiguration> = if (shouldDisableSsl(cmdLine)) Try { { disabledSecurityConfiguration() } } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 4fcb1809..04a0c14a 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -20,7 +20,6 @@ package org.onap.dcae.collectors.veshv.simulators.xnf import io.vavr.collection.HashSet -import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentError import org.onap.dcae.collectors.veshv.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/OngoingSimulationsTest.kt index 325d3bb5..cb604626 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/OngoingSimulationsTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/OngoingSimulationsTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.main +package org.onap.dcae.collectors.veshv.simulators.xnf.impl import org.assertj.core.api.Assertions.assertThat import org.jetbrains.spek.api.Spek @@ -25,11 +25,6 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusFailure -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusNotFound -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusOngoing -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.StatusSuccess import org.onap.dcae.collectors.veshv.tests.utils.waitUntilSucceeds import reactor.core.publisher.Mono import reactor.core.scheduler.Schedulers diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulatorTest.kt index ea0628c1..11ce0b3f 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulatorTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.main +package org.onap.dcae.collectors.veshv.simulators.xnf.impl import arrow.core.Left import arrow.core.None @@ -30,7 +30,6 @@ import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.tests.utils.Assertions.assertThat diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClientTest.kt index 14061532..5e6cb981 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClientTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.main +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import com.nhaarman.mockitokotlin2.any import com.nhaarman.mockitokotlin2.eq diff --git a/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/UtilsKtTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SslUtilsTest.kt index c7c414f8..631ec7bf 100644 --- a/sources/hv-collector-ssl/src/test/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/UtilsKtTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SslUtilsTest.kt @@ -17,10 +17,9 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ssl.boundary +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config import com.nhaarman.mockitokotlin2.doReturn -import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever @@ -36,7 +35,7 @@ import org.onap.dcae.collectors.veshv.commandline.stringValue import java.nio.file.Paths -internal object UtilsKtTest : Spek({ +internal object SslUtilsTest : Spek({ describe("creating securty configuration provider") { @@ -50,27 +49,28 @@ internal object UtilsKtTest : Spek({ .doReturn(passwordFile) it("should create configuration with some keys") { - val configuration = createSecurityConfiguration(commandLine) + val configuration = createSecurityConfigurationProvider(commandLine) verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE) assertThat(configuration.isSuccess()).isTrue() - configuration.map { assertThat(it.keys.isDefined()).isTrue() } + configuration.map { assertThat(it().keys.isDefined()).isTrue() } } } + on("command line with ssl disabled") { val commandLine: CommandLine = mock() whenever(commandLine.hasOption(CommandLineOption.SSL_DISABLE)).doReturn(true) it("should create configuration without keys") { - val configuration = createSecurityConfiguration(commandLine) + val configuration = createSecurityConfigurationProvider(commandLine) verify(commandLine).hasOption(CommandLineOption.SSL_DISABLE) assertThat(configuration.isSuccess()).isTrue() - configuration.map { assertThat(it.keys.isEmpty()).isTrue() } + configuration.map { assertThat(it().keys.isEmpty()).isTrue() } } } } }) private fun resourcePathAsString(resource: String) = - Paths.get(UtilsKtTest::class.java.getResource(resource).toURI()).toString() + Paths.get(SslUtilsTest::class.java.getResource(resource).toURI()).toString() diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/ca.crt b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/ca.crt index f9a05b90..f9a05b90 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/ca.crt +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/ca.crt diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/password b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/password index e69c2de9..e69c2de9 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/password +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/password diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/server.crt b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.crt index 2b06108b..2b06108b 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/server.crt +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.crt diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/server.key b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.key index 40e25932..40e25932 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/server.key +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.key diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/server.ks.pkcs12 b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.ks.pkcs12 Binary files differindex a97eb65a..a97eb65a 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/server.ks.pkcs12 +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/server.ks.pkcs12 diff --git a/sources/hv-collector-ssl/src/test/resources/ssl/trust.pkcs12 b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/trust.pkcs12 Binary files differindex 01b61373..01b61373 100644 --- a/sources/hv-collector-ssl/src/test/resources/ssl/trust.pkcs12 +++ b/sources/hv-collector-xnf-simulator/src/test/resources/ssl/trust.pkcs12 diff --git a/sources/pom.xml b/sources/pom.xml index 7e877438..6f75ec7f 100644 --- a/sources/pom.xml +++ b/sources/pom.xml @@ -143,6 +143,7 @@ <module>hv-collector-domain</module> <module>hv-collector-health-check</module> <module>hv-collector-main</module> + <module>hv-collector-server</module> <module>hv-collector-ssl</module> <module>hv-collector-test-utils</module> <module>hv-collector-utils</module> |