diff options
22 files changed, 116 insertions, 46 deletions
diff --git a/development/configuration/base.json b/development/configuration/base.json index 13c4ea19..1b723b72 100644 --- a/development/configuration/base.json +++ b/development/configuration/base.json @@ -2,7 +2,6 @@ "logLevel": "DEBUG", "server.listenPort": 6061, "server.idleTimeoutSec": 60, - "server.maxPayloadSizeBytes": 1048576, "cbs.firstRequestDelaySec": 10, "cbs.requestIntervalSec": 5, "security.keys.keyStoreFile": "/etc/ves-hv/ssl/server.p12", diff --git a/development/configuration/local.json b/development/configuration/local.json index 79abe03b..ebf2f82e 100644 --- a/development/configuration/local.json +++ b/development/configuration/local.json @@ -2,7 +2,6 @@ "logLevel": "DEBUG", "server.listenPort": 8061, "server.idleTimeoutSec": 60, - "server.maxPayloadSizeBytes": 1048576, "cbs.firstRequestDelaySec": 10, "cbs.requestIntervalSec": 5, "security.keys.keyStoreFile": "development/ssl/server.p12", diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt index 93381572..f0ee3a42 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt @@ -53,7 +53,7 @@ class ConfigurationModule { .throwOnLeft(::MissingArgumentException) .doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } } .map { it.reader().use(configParser::parse) } - .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } } + .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } } .cache() .flatMapMany { basePartialConfig -> cbsConfigurationProvider(basePartialConfig, configStateListener, mdc) diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt index f745d595..8db2f770 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt @@ -37,7 +37,6 @@ data class HvVesConfiguration( data class ServerConfiguration( val listenPort: Int, - val maxPayloadSizeBytes: Int, val idleTimeout: Duration ) @@ -48,4 +47,12 @@ data class CbsConfiguration( data class CollectorConfiguration( val routing: Routing -) +) { + val maxPayloadSizeBytes by lazy { + routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE + } + + companion object { + internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024 + } +} diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt index e782a1e7..56e48038 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt @@ -33,7 +33,6 @@ internal class ConfigurationMerger { PartialConfiguration( listenPort = base.listenPort.updateToGivenOrNone(update.listenPort), idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec), - maxPayloadSizeBytes = base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes), firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec), requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec), diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt index dddf0bed..613ae302 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt @@ -94,7 +94,6 @@ internal class ConfigurationValidator { partial.mapBinding { ServerConfiguration( it.listenPort.bind(), - it.maxPayloadSizeBytes.bind(), Duration.ofSeconds(it.idleTimeoutSec.bind()) ) } @@ -139,7 +138,6 @@ internal class ConfigurationValidator { .trustStorePassword(Passwords.fromString(trustStorePassword)) .build() - private fun validatedCollectorConfig(partial: PartialConfiguration) = partial.mapBinding { config -> CollectorConfiguration( diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt index 30f6c3e3..d09a52e4 100644 --- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt +++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt @@ -34,8 +34,6 @@ internal data class PartialConfiguration( val listenPort: Option<Int> = None, @SerializedName("server.idleTimeoutSec") val idleTimeoutSec: Option<Long> = None, - @SerializedName("server.maxPayloadSizeBytes") - val maxPayloadSizeBytes: Option<Int> = None, @SerializedName("cbs.firstRequestDelaySec") val firstRequestDelaySec: Option<Long> = None, diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt index 94eb519d..8c3c22aa 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt @@ -82,8 +82,6 @@ internal object CbsConfigurationProviderTest : Spek({ assertThat(it.listenPort).isEqualTo(Some(6061)) assertThat(it.idleTimeoutSec).isEqualTo(Some(60L)) - assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576)) - val sinks = it.streamPublishers.orNull()!! val sink1 = sinks[0] @@ -148,7 +146,6 @@ private val validConfiguration = JsonParser().parse(""" { "server.listenPort": 6061, "server.idleTimeoutSec": 60, - "server.maxPayloadSizeBytes": 1048576, "streams_publishes": { "$PERF3GPP_REGIONAL": { "type": "kafka", diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt new file mode 100644 index 00000000..dbdf4ad0 --- /dev/null +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt @@ -0,0 +1,83 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.config.impl + +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import org.assertj.core.api.Assertions.assertThat +import org.jetbrains.spek.api.Spek +import org.jetbrains.spek.api.dsl.describe +import org.jetbrains.spek.api.dsl.it +import org.jetbrains.spek.api.dsl.on +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration +import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE +import org.onap.dcae.collectors.veshv.config.api.model.Route +import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since May 2018 + */ +internal object CollectorConfigurationTest : Spek({ + + describe("CollectorConfiguration") { + describe("calculating maxPayloadSizeBytes") { + on("defined routes") { + val sampleRouting = listOf( + Route(sink1.name(), sink1), + Route(sink2.name(), sink2), + Route(sink3.name(), sink3) + ) + val configuration = CollectorConfiguration(sampleRouting) + + it("should use the highest value among all routes") { + assertThat(configuration.maxPayloadSizeBytes) + .isEqualTo(highestMaxPayloadSize) + } + } + + on("empty routing") { + val configuration = CollectorConfiguration(emptyList()) + + it("should use default value") { + assertThat(configuration.maxPayloadSizeBytes) + .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE) + } + } + } + } +}) + +private const val highestMaxPayloadSize = 3 + +private val sink1 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("") + whenever(it.maxPayloadSizeBytes()).thenReturn(1) +} + +private val sink2 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("") + whenever(it.maxPayloadSizeBytes()).thenReturn(2) +} + +private val sink3 = mock<KafkaSink>().also { + whenever(it.name()).thenReturn("") + whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize) +} diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt index 4cd2ba97..cb8d5005 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt @@ -76,8 +76,6 @@ internal object ConfigurationMergerTest : Spek({ assertThat(result.listenPort).isEqualTo(someListenPort) assertThat(result.idleTimeoutSec.isEmpty()).isFalse() assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) - assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse() - assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576)) } it("merges full config into single parameter") { @@ -89,7 +87,6 @@ internal object ConfigurationMergerTest : Spek({ val result = ConfigurationMerger().merge(actual, diff) assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR)) - assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576)) assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L)) assertThat(result.keyStoreFile.isEmpty()).isFalse() diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt index 5fa1fd62..0806e8ca 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt @@ -76,7 +76,6 @@ internal object ConfigurationValidatorTest : Spek({ val config = PartialConfiguration( listenPort = Some(defaultListenPort), idleTimeoutSec = Some(defaultIdleTimeoutSec), - maxPayloadSizeBytes = Some(defaultMaxPayloadSizeBytes), firstRequestDelaySec = Some(defaultFirstReqDelaySec), requestIntervalSec = Some(defaultRequestIntervalSec), sslDisable = Some(false), @@ -97,8 +96,6 @@ internal object ConfigurationValidatorTest : Spek({ { assertThat(it.server.listenPort) .isEqualTo(defaultListenPort) - assertThat(it.server.maxPayloadSizeBytes) - .isEqualTo(defaultMaxPayloadSizeBytes) assertThat(it.server.idleTimeout) .isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec)) @@ -116,6 +113,8 @@ internal object ConfigurationValidatorTest : Spek({ assertThat(it.collector.routing) .isEqualTo(sampleRouting) + assertThat(it.collector.maxPayloadSizeBytes) + .isEqualTo(sampleMaxPayloadSize) assertThat(it.logLevel).isEqualTo(LogLevel.TRACE) } @@ -183,7 +182,6 @@ internal object ConfigurationValidatorTest : Spek({ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort), idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec), - maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes), firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec), requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec), sslDisable: Option<Boolean> = Some(false), @@ -196,7 +194,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor ) = PartialConfiguration( listenPort = listenPort, idleTimeoutSec = idleTimeoutSec, - maxPayloadSizeBytes = maxPayloadSizeBytes, firstRequestDelaySec = firstReqDelaySec, requestIntervalSec = requestIntervalSec, sslDisable = sslDisable, @@ -209,7 +206,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor ) const val defaultListenPort = 1234 -const val defaultMaxPayloadSizeBytes = 2 const val defaultRequestIntervalSec = 3L const val defaultIdleTimeoutSec = 10L const val defaultFirstReqDelaySec = 10L @@ -220,9 +216,12 @@ const val TRUSTSTORE = "trust.ks.pkcs12" const val TRUSTSTORE_PASSWORD = "changeMeToo" const val sampleSinkName = "perf3gpp" +const val sampleMaxPayloadSize = 1024 -private val sampleSink = mock<KafkaSink>().also { +private val sink = mock<KafkaSink>().also { whenever(it.name()).thenReturn(sampleSinkName) + whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize) } -val sampleStreamsDefinition = listOf(sampleSink) -val sampleRouting = listOf(Route(sampleSink.name(), sampleSink))
\ No newline at end of file + +private val sampleStreamsDefinition = listOf(sink) +private val sampleRouting = listOf(Route(sink.name(), sink))
\ No newline at end of file diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt index 0fdd41c9..ac43c013 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt @@ -36,7 +36,7 @@ import java.io.File */ object HvVesCommandLineParserTest : Spek({ lateinit var cut: HvVesCommandLineParser - val DEFAULT_HEALTHCHECK_PORT = 6060 + val defaultHealthcheckPort = 6060 val emptyConfig = "" val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json") @@ -88,7 +88,7 @@ object HvVesCommandLineParserTest : Spek({ it("should return default port") { assertThat( cut.getHealthcheckPort(arrayOf(emptyConfig)) - ).isEqualTo(DEFAULT_HEALTHCHECK_PORT) + ).isEqualTo(defaultHealthcheckPort) } } } diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt index ad38fd51..919f22c1 100644 --- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt +++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt @@ -87,7 +87,6 @@ internal object JsonConfigurationParserTest : Spek({ assertThat(config.listenPort).isEqualTo(Some(6000)) assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L)) - assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576)) assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L)) assertThat(config.requestIntervalSec).isEqualTo(Some(900L)) diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json index 2c3805ef..a5ad52ae 100644 --- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json +++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json @@ -2,7 +2,6 @@ "logLevel": "ERROR", "server.listenPort": 6000, "server.idleTimeoutSec": 1200, - "server.maxPayloadSizeBytes": 1048576, "cbs.firstRequestDelaySec": 7, "cbs.requestIntervalSec": 900, "security.sslDisable": false, diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt index 3524f14c..c3c5d733 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt @@ -37,8 +37,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext */ class HvVesCollectorFactory(private val configuration: CollectorConfiguration, private val sinkFactory: SinkFactory, - private val metrics: Metrics, - private val maxPayloadSizeBytes: Int): CollectorFactory { + private val metrics: Metrics) : CollectorFactory { override fun invoke(ctx: ClientContext): Collector = createVesHvCollector(ctx) @@ -48,7 +47,9 @@ class HvVesCollectorFactory(private val configuration: CollectorConfiguration, private fun createVesHvCollector(ctx: ClientContext): Collector = HvVesCollector( clientContext = ctx, - wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx), + wireChunkDecoder = WireChunkDecoder( + WireFrameDecoder(configuration.maxPayloadSizeBytes), ctx + ), protobufDecoder = VesDecoder(), router = Router(configuration.routing, sinkFactory, ctx, metrics), metrics = metrics) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt index 430f7981..40ac4dc6 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt @@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND -import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 35dfba8b..3002f334 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink +import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 8b2bc13c..88d1567e 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -55,8 +55,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C private val collectorProvider = HvVesCollectorFactory( configuration, sinkProvider, - metrics, - MAX_PAYLOAD_SIZE_BYTES + metrics ) val collector: Collector @@ -64,18 +63,18 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) + collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT) return sink.sentMessages } fun handleConnection(vararg packets: ByteBuf) { - collector.handleConnection(Flux.fromArray(packets)).block(timeout) + collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT) } override fun close() = collectorProvider.close() companion object { - const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 + private val TIMEOUT = Duration.ofSeconds(10) } } @@ -95,8 +94,6 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory { } } -private val timeout = Duration.ofSeconds(10) - fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut = Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink()) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index d845f7c4..f90f4bc9 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -30,8 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC +import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt index 8956e81f..53e48121 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt @@ -27,7 +27,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP" const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE" const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092" -private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 +const val MAX_PAYLOAD_SIZE_BYTES = 512 * 512 private val perf3gppKafkaSink = ImmutableKafkaSink.builder() .name("PERF3GPP") diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json index 0a5cae07..e302da9e 100644 --- a/sources/hv-collector-main/src/main/docker/base.json +++ b/sources/hv-collector-main/src/main/docker/base.json @@ -2,7 +2,6 @@ "logLevel": "INFO", "server.listenPort": 6061, "server.idleTimeoutSec": 60, - "server.maxPayloadSizeBytes": 1048576, "cbs.firstRequestDelaySec": 10, "cbs.requestIntervalSec": 5, "security.sslDisable": true diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index a34b7118..98a094b2 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -58,8 +58,7 @@ object VesServer { HvVesCollectorFactory( config.collector, AdapterFactory.sinkCreatorFactory(), - MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes + MicrometerMetrics.INSTANCE ) private fun logServerStarted(handle: ServerHandle) = |