aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xdevelopment/bin/consul.sh17
-rw-r--r--development/docker-compose.yml14
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt48
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt4
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt98
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt6
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt5
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt2
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt6
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt8
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt5
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt2
-rw-r--r--sources/hv-collector-main/Dockerfile4
-rw-r--r--sources/hv-collector-main/src/main/docker/base.json12
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt9
-rw-r--r--sources/hv-collector-main/src/main/resources/logback.xml13
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt5
-rw-r--r--sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt25
21 files changed, 141 insertions, 157 deletions
diff --git a/development/bin/consul.sh b/development/bin/consul.sh
index 39f0bdef..2a6bc0f5 100755
--- a/development/bin/consul.sh
+++ b/development/bin/consul.sh
@@ -59,13 +59,16 @@ shift $((OPTIND-1))
DOMAIN=${1:-perf3gpp}
TOPIC=${2:-HV_VES_PERF3GPP}
-CONFIGURATION="
-{
- \"collector.routing\":
- [{
- \"fromDomain\": \"${DOMAIN}\",
- \"toTopic\": \"${TOPIC}\"
- }]
+CONFIGURATION="{
+ "streams_publishes": {
+ "${DOMAIN}": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "message-router-kafka:9092",
+ "topic_name": "${TOPIC}"
+ }
+ }
+ }
}"
CONFIGURATION_ENDPOINT=localhost:8500/v1/kv/veshv-config
diff --git a/development/docker-compose.yml b/development/docker-compose.yml
index e85b520b..d135e8b4 100644
--- a/development/docker-compose.yml
+++ b/development/docker-compose.yml
@@ -43,12 +43,15 @@ services:
image: docker.io/consul:1.0.6
restart: on-failure
command: ["kv", "put", "-http-addr=http://consul-server:8500", "dcae-hv-ves-collector", '{
- "collector.routing": [
- {
- "fromDomain": "perf3gpp",
- "toTopic": "HV_VES_PERF3GPP"
+ "streams_publishes": {
+ "perf3gpp": {
+ "type": "kafka",
+ "kafka_info": {
+ "bootstrap_servers": "message-router-kafka:9092",
+ "topic_name": "HV_VES_PERF3GPP"
+ }
}
- ]
+ }
}'
]
depends_on:
@@ -74,6 +77,7 @@ services:
- "6061:6061/tcp"
environment:
JAVA_OPTS: "-Dio.netty.leakDetection.level=paranoid -Dlogback.configurationFile=/etc/ONAP/dcae-hv-ves-collector/logback.xml"
+ VESHV_CONFIGURATION_FILE: "/etc/ves-hv/configuration/base.json"
CONSUL_HOST: "consul-server"
CONFIG_BINDING_SERVICE: "CBS"
HOSTNAME: "dcae-hv-ves-collector"
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 9684484b..ccce62a4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.config.api
+import arrow.core.getOrElse
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.MissingArgumentException
import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
@@ -27,41 +28,56 @@ import org.onap.dcae.collectors.veshv.config.impl.ConfigurationMerger
import org.onap.dcae.collectors.veshv.config.impl.ConfigurationValidator
import org.onap.dcae.collectors.veshv.config.impl.FileConfigurationReader
import org.onap.dcae.collectors.veshv.config.impl.HvVesCommandLineParser
-import org.onap.dcae.collectors.veshv.utils.arrow.rightOrThrow
+import org.onap.dcae.collectors.veshv.config.impl.PartialConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.throwOnLeft
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.MappedDiagnosticContext
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClientFactory
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.EnvProperties
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
class ConfigurationModule {
private val cmd = HvVesCommandLineParser()
private val configReader = FileConfigurationReader()
private val configValidator = ConfigurationValidator()
+ private val merger = ConfigurationMerger()
fun healthCheckPort(args: Array<String>): Int = cmd.getHealthcheckPort(args)
fun hvVesConfigurationUpdates(args: Array<String>,
configStateListener: ConfigurationStateListener,
mdc: MappedDiagnosticContext): Flux<HvVesConfiguration> =
- Flux.just(cmd.getConfigurationFile(args))
- .throwOnLeft { MissingArgumentException(it.message, it.cause) }
- .map { it.reader().use(configReader::loadConfig) }
+ Mono.just(cmd.getConfigurationFile(args))
+ .throwOnLeft(::MissingArgumentException)
+ .map {
+ logger.info { "Using base configuration file: ${it.absolutePath}" }
+ it.reader().use(configReader::loadConfig)
+ }
.cache()
- .flatMap { basePartialConfig ->
- val baseConfig = configValidator.validate(basePartialConfig)
- .rightOrThrow { ValidationException(it.message) }
- val cbsConfigProvider = CbsConfigurationProvider(
- CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
- baseConfig.cbs,
- configStateListener,
- mdc)
- val merger = ConfigurationMerger()
- cbsConfigProvider()
+ .flatMapMany { basePartialConfig ->
+ cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
+ .invoke()
.map { merger.merge(basePartialConfig, it) }
- .map { configValidator.validate(it) }
- .throwOnLeft { ValidationException(it.message) }
+ .map(configValidator::validate)
+ .throwOnLeft()
}
+ private fun cbsConfigurationProvider(basePartialConfig: PartialConfiguration,
+ configStateListener: ConfigurationStateListener,
+ mdc: MappedDiagnosticContext): CbsConfigurationProvider =
+ CbsConfigurationProvider(
+ CbsClientFactory.createCbsClient(EnvProperties.fromEnvironment()),
+ cbsConfigurationFrom(basePartialConfig),
+ configStateListener,
+ mdc)
+
+ private fun cbsConfigurationFrom(basePartialConfig: PartialConfiguration) =
+ configValidator.validatedCbsConfiguration(basePartialConfig)
+ .getOrElse { throw ValidationException("Invalid CBS section defined in configuration file") }
+
+ companion object {
+ private val logger = Logger(ConfigurationModule::class)
+ }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
index 2fc29829..bea7cc56 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/exceptions.kt
@@ -19,6 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.config.api.model
-class MissingArgumentException(message: String, cause: Throwable?) : RuntimeException(message, cause)
+import org.onap.dcae.collectors.veshv.commandline.WrongArgumentError
+
+class MissingArgumentException(err: WrongArgumentError) : RuntimeException(err.message, err.cause)
class ValidationException(message: String) : RuntimeException(message)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index f044492c..63d590a2 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -74,8 +74,6 @@ internal class ConfigurationMerger {
updateOption: Option<PartialCollectorConfig>) =
applyUpdate(baseOption, updateOption) { base, update ->
PartialCollectorConfig(
- base.maxRequestSizeBytes.updateToGivenOrNone(update.maxRequestSizeBytes),
- base.kafkaServers.updateToGivenOrNone(update.kafkaServers),
base.routing.updateToGivenOrNone(update.routing)
)
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index 3e599b58..ead5655a 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -27,13 +27,13 @@ import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.ServerConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.ValidationException
import org.onap.dcae.collectors.veshv.ssl.boundary.SecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.arrow.OptionUtils.binding
import org.onap.dcae.collectors.veshv.utils.arrow.mapBinding
+import org.onap.dcae.collectors.veshv.utils.arrow.doOnEmpty
import org.onap.dcae.collectors.veshv.utils.logging.LogLevel
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import java.net.InetSocketAddress
-import java.time.Duration
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -41,33 +41,35 @@ import java.time.Duration
*/
internal class ConfigurationValidator {
- fun validate(partialConfig: PartialConfiguration)
- : Either<ValidationError, HvVesConfiguration> = binding {
- val logLevel = determineLogLevel(partialConfig.logLevel)
+ fun validate(partialConfig: PartialConfiguration) =
+ logger.info { "About to validate configuration: $partialConfig" }.let {
+ binding {
+ val logLevel = determineLogLevel(partialConfig.logLevel)
- val serverConfiguration = partialConfig.server.bind()
- .let { createServerConfiguration(it).bind() }
+ val serverConfiguration = validatedServerConfiguration(partialConfig)
+ .doOnEmpty { logger.debug { "Cannot bind server configuration" } }
+ .bind()
- val cbsConfiguration = partialConfig.cbs.bind()
- .let { createCbsConfiguration(it).bind() }
+ val cbsConfiguration = validatedCbsConfiguration(partialConfig)
+ .doOnEmpty { logger.debug { "Cannot bind cbs configuration" } }
+ .bind()
- val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
+ val securityConfiguration = SecurityConfiguration(partialConfig.security.bind().keys)
-// TOD0: retrieve when ConfigurationMerger is implemented
-// val collectorConfiguration = partialConfig.collector.bind()
-// .let { createCollectorConfig(it).bind() }
+ val collectorConfiguration = validatedCollectorConfig(partialConfig)
+ .doOnEmpty { logger.debug { "Cannot bind collector configuration" } }
+ .bind()
+
+ HvVesConfiguration(
+ serverConfiguration,
+ cbsConfiguration,
+ securityConfiguration,
+ collectorConfiguration,
+ logLevel
+ )
+ }.toEither { ValidationException("Some required configuration options are missing") }
+ }
- HvVesConfiguration(
- serverConfiguration,
- cbsConfiguration,
- securityConfiguration,
-// TOD0: swap when ConfigurationMerger is implemented
-// collectorConfiguration
- CollectorConfiguration(emptyList()),
-// end TOD0
- logLevel
- )
- }.toEither { ValidationError("Some required configuration options are missing") }
private fun determineLogLevel(logLevel: Option<LogLevel>) =
logLevel.getOrElse {
@@ -78,40 +80,38 @@ internal class ConfigurationValidator {
DEFAULT_LOG_LEVEL
}
- private fun createServerConfiguration(partial: PartialServerConfig) =
+ private fun validatedServerConfiguration(partial: PartialConfiguration) =
partial.mapBinding {
- ServerConfiguration(
- it.listenPort.bind(),
- it.idleTimeoutSec.bind(),
- it.maxPayloadSizeBytes.bind()
- )
+ partial.server.bind().let {
+ ServerConfiguration(
+ it.listenPort.bind(),
+ it.idleTimeoutSec.bind(),
+ it.maxPayloadSizeBytes.bind()
+ )
+ }
}
- private fun createCbsConfiguration(partial: PartialCbsConfig) =
+ fun validatedCbsConfiguration(partial: PartialConfiguration) =
partial.mapBinding {
- CbsConfiguration(
- it.firstRequestDelaySec.bind(),
- it.requestIntervalSec.bind()
- )
+ it.cbs.bind().let {
+ CbsConfiguration(
+ it.firstRequestDelaySec.bind(),
+ it.requestIntervalSec.bind()
+ )
+ }
}
-// TOD0: retrieve when ConfigurationMerger is implemented
-// private fun createCollectorConfig(partial: PartialCollectorConfig) =
-// partial.mapBinding {
-// CollectorConfiguration(
-// it.maxRequestSizeBytes.bind(),
-// toKafkaServersString(it.kafkaServers.bind()),
-// it.routing.bind()
-// )
-// }
-
- private fun toKafkaServersString(kafkaServers: List<InetSocketAddress>): String =
- kafkaServers.joinToString(",") { "${it.hostName}:${it.port}" }
+ private fun validatedCollectorConfig(partial: PartialConfiguration) =
+ partial.mapBinding {
+ partial.collector.bind().let {
+ CollectorConfiguration(
+ it.routing.bind()
+ )
+ }
+ }
companion object {
val DEFAULT_LOG_LEVEL = LogLevel.INFO
private val logger = Logger(ConfigurationValidator::class)
}
}
-
-data class ValidationError(val message: String, val cause: Option<Throwable> = None)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
index 9513107b..f6ae5bec 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/FileConfigurationReader.kt
@@ -24,6 +24,7 @@ import com.google.gson.GsonBuilder
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.DurationOfSecondsAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.OptionAdapter
import org.onap.dcae.collectors.veshv.config.impl.gsonadapters.SecurityAdapter
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
import java.io.Reader
import java.time.Duration
@@ -41,4 +42,9 @@ internal class FileConfigurationReader {
fun loadConfig(input: Reader): PartialConfiguration =
gson.fromJson(input, PartialConfiguration::class.java)
+ .also { logger.info { "Successfully read file and parsed json to configuration: $it" } }
+
+ companion object {
+ private val logger = Logger(FileConfigurationReader::class)
+ }
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
index c1a98294..c6730a4c 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParser.kt
@@ -45,7 +45,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
it.stringValue(CONFIGURATION_FILE).map(::File)
}.toEither {
WrongArgumentError(
- message = "Unexpected error when parsing command line arguments",
+ message = "Base configuration filepath missing on command line",
cmdLineOptionsList = cmdLineOptionsList)
}
@@ -53,8 +53,7 @@ internal class HvVesCommandLineParser(private val parser: CommandLineParser = De
parse(args) {
it.intValue(HEALTH_CHECK_API_PORT)
}.getOrElse {
- logger.info { "Healthcheck port missing on command line," +
- " using default: $DEFAULT_HEALTHCHECK_PORT" }
+ logger.info { "Healthcheck port missing on command line, using default: $DEFAULT_HEALTHCHECK_PORT" }
DEFAULT_HEALTHCHECK_PORT
}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index f3c149cd..b4e1bf6b 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -53,7 +53,5 @@ internal data class PartialCbsConfig(
internal data class PartialSecurityConfig(val keys: Option<SecurityKeys> = None)
internal data class PartialCollectorConfig(
- val maxRequestSizeBytes: Option<Int> = None,
- val kafkaServers: Option<List<InetSocketAddress>> = None, // TOD0: remove properties and simplify this part
val routing: Option<Routing> = None
)
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 4b89488b..55d06cdd 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -64,8 +64,6 @@ internal object ConfigurationValidatorTest : Spek({
Some(mock())
)),
Some(PartialCollectorConfig(
- Some(4),
- Some(emptyList()),
someFromEmptyRouting
)),
None
@@ -103,8 +101,6 @@ internal object ConfigurationValidatorTest : Spek({
securityKeys
)),
Some(PartialCollectorConfig(
- Some(4),
- Some(emptyList()),
someFromEmptyRouting
)),
Some(LogLevel.INFO)
@@ -152,8 +148,6 @@ internal object ConfigurationValidatorTest : Spek({
securityKeys
)),
Some(PartialCollectorConfig(
- Some(4),
- Some(emptyList()),
someFromEmptyRouting
)),
Some(LogLevel.INFO)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
index 433e4d57..618b818f 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/VesHvCollector.kt
@@ -57,8 +57,6 @@ internal class VesHvCollector(
.transform(::filterInvalidWireFrame)
.transform(::decodeProtobufPayload)
.transform(::filterInvalidProtobufMessages)
- // TOD0: try to remove new flux creation in Sink interface to avoid two calls to handleErrors here
- .handleErrors()
.transform(::route)
.handleErrors()
.doFinally { releaseBuffersMemory() }
@@ -106,14 +104,14 @@ internal class VesHvCollector(
}
}
+ private fun releaseBuffersMemory() = wireChunkDecoder.release()
+ .also { logger.debug { "Released buffer memory after handling message stream" } }
+
private fun <T> Flux<T>.handleErrors(): Flux<T> = onErrorResume {
metrics.notifyClientRejected(ClientRejectionCause.fromThrowable(it))
logger.handleReactiveStreamError(clientContext, it)
}
- private fun releaseBuffersMemory() = wireChunkDecoder.release()
- .also { logger.debug { "Released buffer memory after handling message stream" } }
-
private fun <T> Flux<T>.filterFailedWithLog(predicate: (T) -> MessageEither): Flux<T> =
filterFailedWithLog(logger, clientContext::fullMdc, predicate)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 92719e94..430f7981 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -33,6 +33,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
+import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
@@ -159,7 +160,7 @@ object MetricsSpecification : Spek({
.isEqualTo(1)
}
- it("should gather metrics for sing errors") {
+ it("should gather metrics for sink errors") {
val sut = vesHvWithAlwaysFailingSink(basicRouting)
sut.handleConnection(vesWireFrameMessage(domain = PERF3GPP))
@@ -190,7 +191,7 @@ object MetricsSpecification : Spek({
given("rejection causes") {
mapOf(
ClientRejectionCause.PAYLOAD_SIZE_EXCEEDED_IN_MESSAGE to
- messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1),
+ messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1),
ClientRejectionCause.INVALID_WIRE_FRAME_MARKER to garbageFrame()
).forEach { cause, vesMessage ->
on("cause $cause") {
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 1217c471..f79c2e46 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -77,7 +77,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
override fun close() = collectorProvider.close()
companion object {
- const val MAX_PAYLOAD_SIZE_BYTES = 1024
+ const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
}
}
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index 6a718eea..2430c74f 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -23,7 +23,6 @@ import arrow.core.None
import org.assertj.core.api.Assertions.assertThat
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
-import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.config.api.model.Routing
@@ -31,21 +30,17 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
+import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.alternativeRouting
-import org.onap.dcae.collectors.veshv.tests.fakes.emptyRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.messageWithInvalidWireFrameHeader
import org.onap.dcae.collectors.veshv.tests.utils.messageWithPayloadOfSize
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
-import reactor.core.publisher.Flux
-import java.time.Duration
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -97,7 +92,7 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = messageWithInvalidWireFrameHeader()
- val msgWithTooBigPayload = messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
+ val msgWithTooBigPayload = messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
@@ -208,7 +203,7 @@ object VesHvSpecification : Spek({
val handledMessages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP, "first"),
- messageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
+ messageWithPayloadOfSize(MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
index e9914ef1..8956e81f 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -27,7 +27,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
-const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
.name("PERF3GPP")
diff --git a/sources/hv-collector-main/Dockerfile b/sources/hv-collector-main/Dockerfile
index a1e89a73..cfd4a7bb 100644
--- a/sources/hv-collector-main/Dockerfile
+++ b/sources/hv-collector-main/Dockerfile
@@ -1,6 +1,6 @@
FROM docker.io/openjdk:11-jre-slim
-LABEL copyright="Copyright (C) 2018 NOKIA"
+LABEL copyright="Copyright (C) 2018-2019 NOKIA"
LABEL license.name="The Apache Software License, Version 2.0"
LABEL license.url="http://www.apache.org/licenses/LICENSE-2.0"
LABEL maintainer="Nokia Wroclaw ONAP Team"
@@ -19,4 +19,4 @@ COPY target/libs/internal/* ./
COPY src/main/docker/*.sh ./
COPY src/main/docker/base.json /etc/ves-hv/configuration/base.json
-COPY target/hv-collector-main-*.jar ./
+COPY target/hv-collector-main-*.jar ./ \ No newline at end of file
diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json
index 67576c80..e0b9c450 100644
--- a/sources/hv-collector-main/src/main/docker/base.json
+++ b/sources/hv-collector-main/src/main/docker/base.json
@@ -10,17 +10,5 @@
"requestIntervalSec": 5
},
"security": {
- },
- "collector": {
- "maxRequestSizeBytes": 1048576,
- "kafkaServers": [
- "message-router-kafka:9092"
- ],
- "routing": [
- {
- "fromDomain": "perf3gpp",
- "toTopic": "HV_VES_PERF3GPP"
- }
- ]
}
}
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index 22d8000e..dc207ef8 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -29,7 +29,6 @@ import org.onap.dcae.collectors.veshv.main.servers.VesServer
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.ServerHandle
import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.onap.dcae.collectors.veshv.utils.neverComplete
import org.onap.dcae.collectors.veshv.utils.registerShutdownHook
import reactor.core.scheduler.Schedulers
import java.util.concurrent.atomic.AtomicReference
@@ -57,20 +56,20 @@ fun main(args: Array<String>) {
HealthState.INSTANCE.changeState(HealthDescription.HEALTHY)
}
.doOnError {
- logger.error(ServiceContext::mdc) { "Failed to acquire configuration ${it.message}" }
- logger.debug(ServiceContext::mdc) { "Detailed stack trace: $it" }
+ logger.error(ServiceContext::mdc) { "Failed to create configuration: ${it.message}" }
+ logger.withDebug(ServiceContext::mdc) { log("Detailed stack trace: ", it) }
HealthState.INSTANCE.changeState(HealthDescription.DYNAMIC_CONFIGURATION_NOT_FOUND)
}
.doOnNext(::startServer)
.doOnError(::logServerStartFailed)
- .neverComplete() // TODO: remove after merging configuration stream with cbs
+ .then()
.block()
}
private fun startServer(config: HvVesConfiguration) {
stopRunningServer()
Logger.setLogLevel(VES_HV_PACKAGE, config.logLevel)
- logger.info { "Using configuration: $config" }
+ logger.debug(ServiceContext::mdc) { "Configuration: $config" }
VesServer.start(config).let {
registerShutdownHook { shutdownGracefully(it) }
diff --git a/sources/hv-collector-main/src/main/resources/logback.xml b/sources/hv-collector-main/src/main/resources/logback.xml
index 40f3c8a0..21c1fa31 100644
--- a/sources/hv-collector-main/src/main/resources/logback.xml
+++ b/sources/hv-collector-main/src/main/resources/logback.xml
@@ -20,9 +20,9 @@
-->
<configuration>
<property name="COMPONENT_NAME"
- value="dcae-hv-ves-collector"/>
+ value="dcae-hv-ves-collector"/>
<property name="COMPONENT_SHORT_NAME"
- value="hv-ves"/>
+ value="hv-ves"/>
<property name="LOG_FILENAME" value="${COMPONENT_SHORT_NAME}"/>
<property name="LOG_PATH" value="/var/log/ONAP/${COMPONENT_NAME}"/>
@@ -57,7 +57,8 @@
| ${p_mdc}\t
| ${p_thr}%n"/>
- <property name="ONAP_LOG_PATTERN" value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/>
+ <property name="ONAP_LOG_PATTERN"
+ value="%nopexception${p_log}|${p_tim}|${p_lvl}|${p_msg}|${p_mdc}|${p_exc}|${p_mak}|${p_thr}%n"/>
<property name="ONAP_LOG_PATTERN_FROM_WIKI" value="%nopexception%logger
|%date{yyyy-MM-dd'T'HH:mm:ss.SSSXXX,UTC}
@@ -76,7 +77,7 @@
</appender>
<appender name="ROLLING-FILE"
- class="ch.qos.logback.core.rolling.RollingFileAppender">
+ class="ch.qos.logback.core.rolling.RollingFileAppender">
<encoder>
<pattern>${ONAP_LOG_PATTERN}</pattern>
</encoder>
@@ -93,8 +94,10 @@
<logger name="io.netty" level="INFO"/>
<logger name="io.netty.util" level="WARN"/>
<logger name="org.apache.kafka" level="INFO"/>
+ <logger name="org.onap.dcaegen2.services.sdk" level="INFO"/>
+ <logger name="org.onap.dcaegen2.services.sdk.rest.services.cbs.client.impl.CbsClientImpl" level="WARN"/>
- <root level="INFO">
+ <root level="DEBUG">
<appender-ref ref="CONSOLE"/>
<appender-ref ref="ROLLING-FILE"/>
</root>
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
index d5b33b91..47b3d559 100644
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
+++ b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/arrow/core.kt
@@ -34,6 +34,7 @@ import arrow.syntax.collections.firstOption
import arrow.typeclasses.MonadContinuation
import arrow.typeclasses.binding
import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import java.util.concurrent.atomic.AtomicReference
/**
@@ -57,8 +58,12 @@ fun <B> Either<Throwable, B>.rightOrThrow() = fold({ throw it }, ::identity)
fun <A, B> Either<A, B>.rightOrThrow(mapper: (A) -> Throwable) = fold({ throw mapper(it) }, ::identity)
+fun <A : Exception, B> Flux<Either<A, B>>.throwOnLeft(): Flux<B> = map { it.rightOrThrow() }
+
fun <A, B> Flux<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Flux<B> = map { it.rightOrThrow(f) }
+fun <A, B> Mono<Either<A, B>>.throwOnLeft(f: (A) -> Exception): Mono<B> = map { it.rightOrThrow(f) }
+
fun <A> AtomicReference<A>.getOption() = Option.fromNullable(get())
fun <A> Option.Companion.fromNullablesChain(firstValue: A?, vararg nextValues: () -> A?): Option<A> =
diff --git a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt b/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
deleted file mode 100644
index aaa598d2..00000000
--- a/sources/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/reactive.kt
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2019 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.utils
-
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-
-fun <T> Flux<T>.neverComplete(): Mono<Void> = then(Mono.never<T>()).then() \ No newline at end of file