summaryrefslogtreecommitdiffstats
path: root/sources
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-04-10 09:31:15 +0000
committerGerrit Code Review <gerrit@onap.org>2019-04-10 09:31:15 +0000
commitd52444107a3c62c1027e35178b76645ceb4d2c4e (patch)
treed0dbd57bc0f5a6e4387e74f1c00912ab8f2f98ee /sources
parentf864f9ae0cf8cef72b64cbda8964e86398b3f749 (diff)
parent735469d5003388df3622819c3d1db0e89a64197c (diff)
Merge "Move maxPayloadSizeBytes to CollectorConfiguration"
Diffstat (limited to 'sources')
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt11
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt1
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt2
-rw-r--r--sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt2
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt3
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt83
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt3
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt17
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt4
-rw-r--r--sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt1
-rw-r--r--sources/hv-collector-configuration/src/test/resources/sampleConfig.json1
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt7
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt11
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt2
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt2
-rw-r--r--sources/hv-collector-main/src/main/docker/base.json1
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
20 files changed, 116 insertions, 44 deletions
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
index 93381572..f0ee3a42 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/ConfigurationModule.kt
@@ -53,7 +53,7 @@ class ConfigurationModule {
.throwOnLeft(::MissingArgumentException)
.doOnNext { logger.info { "Using base configuration file: ${it.absolutePath}" } }
.map { it.reader().use(configParser::parse) }
- .doOnNext { logger.info { "Successfully parsed json file to configuration: $it" } }
+ .doOnNext { logger.info { "Successfully parsed configuration file to: $it" } }
.cache()
.flatMapMany { basePartialConfig ->
cbsConfigurationProvider(basePartialConfig, configStateListener, mdc)
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
index f745d595..8db2f770 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/api/model/configuration.kt
@@ -37,7 +37,6 @@ data class HvVesConfiguration(
data class ServerConfiguration(
val listenPort: Int,
- val maxPayloadSizeBytes: Int,
val idleTimeout: Duration
)
@@ -48,4 +47,12 @@ data class CbsConfiguration(
data class CollectorConfiguration(
val routing: Routing
-)
+) {
+ val maxPayloadSizeBytes by lazy {
+ routing.map { it.sink.maxPayloadSizeBytes() }.max() ?: DEFAULT_MAX_PAYLOAD_SIZE
+ }
+
+ companion object {
+ internal const val DEFAULT_MAX_PAYLOAD_SIZE = 1024 * 1024
+ }
+}
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
index e782a1e7..56e48038 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMerger.kt
@@ -33,7 +33,6 @@ internal class ConfigurationMerger {
PartialConfiguration(
listenPort = base.listenPort.updateToGivenOrNone(update.listenPort),
idleTimeoutSec = base.idleTimeoutSec.updateToGivenOrNone(update.idleTimeoutSec),
- maxPayloadSizeBytes = base.maxPayloadSizeBytes.updateToGivenOrNone(update.maxPayloadSizeBytes),
firstRequestDelaySec = base.firstRequestDelaySec.updateToGivenOrNone(update.firstRequestDelaySec),
requestIntervalSec = base.requestIntervalSec.updateToGivenOrNone(update.requestIntervalSec),
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
index dddf0bed..613ae302 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidator.kt
@@ -94,7 +94,6 @@ internal class ConfigurationValidator {
partial.mapBinding {
ServerConfiguration(
it.listenPort.bind(),
- it.maxPayloadSizeBytes.bind(),
Duration.ofSeconds(it.idleTimeoutSec.bind())
)
}
@@ -139,7 +138,6 @@ internal class ConfigurationValidator {
.trustStorePassword(Passwords.fromString(trustStorePassword))
.build()
-
private fun validatedCollectorConfig(partial: PartialConfiguration) =
partial.mapBinding { config ->
CollectorConfiguration(
diff --git a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
index 30f6c3e3..d09a52e4 100644
--- a/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
+++ b/sources/hv-collector-configuration/src/main/kotlin/org/onap/dcae/collectors/veshv/config/impl/partial_configuration.kt
@@ -34,8 +34,6 @@ internal data class PartialConfiguration(
val listenPort: Option<Int> = None,
@SerializedName("server.idleTimeoutSec")
val idleTimeoutSec: Option<Long> = None,
- @SerializedName("server.maxPayloadSizeBytes")
- val maxPayloadSizeBytes: Option<Int> = None,
@SerializedName("cbs.firstRequestDelaySec")
val firstRequestDelaySec: Option<Long> = None,
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
index 94eb519d..8c3c22aa 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CbsConfigurationProviderTest.kt
@@ -82,8 +82,6 @@ internal object CbsConfigurationProviderTest : Spek({
assertThat(it.listenPort).isEqualTo(Some(6061))
assertThat(it.idleTimeoutSec).isEqualTo(Some(60L))
- assertThat(it.maxPayloadSizeBytes).isEqualTo(Some(1048576))
-
val sinks = it.streamPublishers.orNull()!!
val sink1 = sinks[0]
@@ -148,7 +146,6 @@ private val validConfiguration = JsonParser().parse("""
{
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"streams_publishes": {
"$PERF3GPP_REGIONAL": {
"type": "kafka",
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
new file mode 100644
index 00000000..dbdf4ad0
--- /dev/null
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/CollectorConfigurationTest.kt
@@ -0,0 +1,83 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.config.impl
+
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
+import org.assertj.core.api.Assertions.assertThat
+import org.jetbrains.spek.api.Spek
+import org.jetbrains.spek.api.dsl.describe
+import org.jetbrains.spek.api.dsl.it
+import org.jetbrains.spek.api.dsl.on
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
+import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration.Companion.DEFAULT_MAX_PAYLOAD_SIZE
+import org.onap.dcae.collectors.veshv.config.api.model.Route
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.KafkaSink
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since May 2018
+ */
+internal object CollectorConfigurationTest : Spek({
+
+ describe("CollectorConfiguration") {
+ describe("calculating maxPayloadSizeBytes") {
+ on("defined routes") {
+ val sampleRouting = listOf(
+ Route(sink1.name(), sink1),
+ Route(sink2.name(), sink2),
+ Route(sink3.name(), sink3)
+ )
+ val configuration = CollectorConfiguration(sampleRouting)
+
+ it("should use the highest value among all routes") {
+ assertThat(configuration.maxPayloadSizeBytes)
+ .isEqualTo(highestMaxPayloadSize)
+ }
+ }
+
+ on("empty routing") {
+ val configuration = CollectorConfiguration(emptyList())
+
+ it("should use default value") {
+ assertThat(configuration.maxPayloadSizeBytes)
+ .isEqualTo(DEFAULT_MAX_PAYLOAD_SIZE)
+ }
+ }
+ }
+ }
+})
+
+private const val highestMaxPayloadSize = 3
+
+private val sink1 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(1)
+}
+
+private val sink2 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(2)
+}
+
+private val sink3 = mock<KafkaSink>().also {
+ whenever(it.name()).thenReturn("")
+ whenever(it.maxPayloadSizeBytes()).thenReturn(highestMaxPayloadSize)
+}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
index 4cd2ba97..cb8d5005 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationMergerTest.kt
@@ -76,8 +76,6 @@ internal object ConfigurationMergerTest : Spek({
assertThat(result.listenPort).isEqualTo(someListenPort)
assertThat(result.idleTimeoutSec.isEmpty()).isFalse()
assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
- assertThat(result.maxPayloadSizeBytes.isEmpty()).isFalse()
- assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
}
it("merges full config into single parameter") {
@@ -89,7 +87,6 @@ internal object ConfigurationMergerTest : Spek({
val result = ConfigurationMerger().merge(actual, diff)
assertThat(result.logLevel).isEqualTo(Some(LogLevel.ERROR))
- assertThat(result.maxPayloadSizeBytes).isEqualTo(Some(1048576))
assertThat(result.idleTimeoutSec).isEqualTo(Some(1200L))
assertThat(result.keyStoreFile.isEmpty()).isFalse()
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
index 5fa1fd62..0806e8ca 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/ConfigurationValidatorTest.kt
@@ -76,7 +76,6 @@ internal object ConfigurationValidatorTest : Spek({
val config = PartialConfiguration(
listenPort = Some(defaultListenPort),
idleTimeoutSec = Some(defaultIdleTimeoutSec),
- maxPayloadSizeBytes = Some(defaultMaxPayloadSizeBytes),
firstRequestDelaySec = Some(defaultFirstReqDelaySec),
requestIntervalSec = Some(defaultRequestIntervalSec),
sslDisable = Some(false),
@@ -97,8 +96,6 @@ internal object ConfigurationValidatorTest : Spek({
{
assertThat(it.server.listenPort)
.isEqualTo(defaultListenPort)
- assertThat(it.server.maxPayloadSizeBytes)
- .isEqualTo(defaultMaxPayloadSizeBytes)
assertThat(it.server.idleTimeout)
.isEqualTo(Duration.ofSeconds(defaultIdleTimeoutSec))
@@ -116,6 +113,8 @@ internal object ConfigurationValidatorTest : Spek({
assertThat(it.collector.routing)
.isEqualTo(sampleRouting)
+ assertThat(it.collector.maxPayloadSizeBytes)
+ .isEqualTo(sampleMaxPayloadSize)
assertThat(it.logLevel).isEqualTo(LogLevel.TRACE)
}
@@ -183,7 +182,6 @@ internal object ConfigurationValidatorTest : Spek({
private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPort),
idleTimeoutSec: Option<Long> = Some(defaultIdleTimeoutSec),
- maxPayloadSizeBytes: Option<Int> = Some(defaultMaxPayloadSizeBytes),
firstReqDelaySec: Option<Long> = Some(defaultFirstReqDelaySec),
requestIntervalSec: Option<Long> = Some(defaultRequestIntervalSec),
sslDisable: Option<Boolean> = Some(false),
@@ -196,7 +194,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
) = PartialConfiguration(
listenPort = listenPort,
idleTimeoutSec = idleTimeoutSec,
- maxPayloadSizeBytes = maxPayloadSizeBytes,
firstRequestDelaySec = firstReqDelaySec,
requestIntervalSec = requestIntervalSec,
sslDisable = sslDisable,
@@ -209,7 +206,6 @@ private fun partialConfiguration(listenPort: Option<Int> = Some(defaultListenPor
)
const val defaultListenPort = 1234
-const val defaultMaxPayloadSizeBytes = 2
const val defaultRequestIntervalSec = 3L
const val defaultIdleTimeoutSec = 10L
const val defaultFirstReqDelaySec = 10L
@@ -220,9 +216,12 @@ const val TRUSTSTORE = "trust.ks.pkcs12"
const val TRUSTSTORE_PASSWORD = "changeMeToo"
const val sampleSinkName = "perf3gpp"
+const val sampleMaxPayloadSize = 1024
-private val sampleSink = mock<KafkaSink>().also {
+private val sink = mock<KafkaSink>().also {
whenever(it.name()).thenReturn(sampleSinkName)
+ whenever(it.maxPayloadSizeBytes()).thenReturn(sampleMaxPayloadSize)
}
-val sampleStreamsDefinition = listOf(sampleSink)
-val sampleRouting = listOf(Route(sampleSink.name(), sampleSink)) \ No newline at end of file
+
+private val sampleStreamsDefinition = listOf(sink)
+private val sampleRouting = listOf(Route(sink.name(), sink)) \ No newline at end of file
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt
index 0fdd41c9..ac43c013 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/HvVesCommandLineParserTest.kt
@@ -36,7 +36,7 @@ import java.io.File
*/
object HvVesCommandLineParserTest : Spek({
lateinit var cut: HvVesCommandLineParser
- val DEFAULT_HEALTHCHECK_PORT = 6060
+ val defaultHealthcheckPort = 6060
val emptyConfig = ""
val configFilePath = javaClass.absoluteResourcePath("sampleConfig.json")
@@ -88,7 +88,7 @@ object HvVesCommandLineParserTest : Spek({
it("should return default port") {
assertThat(
cut.getHealthcheckPort(arrayOf(emptyConfig))
- ).isEqualTo(DEFAULT_HEALTHCHECK_PORT)
+ ).isEqualTo(defaultHealthcheckPort)
}
}
}
diff --git a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
index ad38fd51..919f22c1 100644
--- a/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
+++ b/sources/hv-collector-configuration/src/test/kotlin/org/onap/dcae/collectors/veshv/config/impl/JsonConfigurationParserTest.kt
@@ -87,7 +87,6 @@ internal object JsonConfigurationParserTest : Spek({
assertThat(config.listenPort).isEqualTo(Some(6000))
assertThat(config.idleTimeoutSec).isEqualTo(Some(1200L))
- assertThat(config.maxPayloadSizeBytes).isEqualTo(Some(1048576))
assertThat(config.firstRequestDelaySec).isEqualTo(Some(7L))
assertThat(config.requestIntervalSec).isEqualTo(Some(900L))
diff --git a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
index 2c3805ef..a5ad52ae 100644
--- a/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
+++ b/sources/hv-collector-configuration/src/test/resources/sampleConfig.json
@@ -2,7 +2,6 @@
"logLevel": "ERROR",
"server.listenPort": 6000,
"server.idleTimeoutSec": 1200,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 7,
"cbs.requestIntervalSec": 900,
"security.sslDisable": false,
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
index 3524f14c..c3c5d733 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/HvVesCollectorFactory.kt
@@ -37,8 +37,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
*/
class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
private val sinkFactory: SinkFactory,
- private val metrics: Metrics,
- private val maxPayloadSizeBytes: Int): CollectorFactory {
+ private val metrics: Metrics) : CollectorFactory {
override fun invoke(ctx: ClientContext): Collector =
createVesHvCollector(ctx)
@@ -48,7 +47,9 @@ class HvVesCollectorFactory(private val configuration: CollectorConfiguration,
private fun createVesHvCollector(ctx: ClientContext): Collector =
HvVesCollector(
clientContext = ctx,
- wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
+ wireChunkDecoder = WireChunkDecoder(
+ WireFrameDecoder(configuration.maxPayloadSizeBytes), ctx
+ ),
protobufDecoder = VesDecoder(),
router = Router(configuration.routing, sinkFactory, ctx, metrics),
metrics = metrics)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
index 430f7981..40ac4dc6 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/MetricsSpecification.kt
@@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.KAFKA_FAILURE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicRouting
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 35dfba8b..3002f334 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -33,8 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.CollectorConfiguration
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 8b2bc13c..88d1567e 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -55,8 +55,7 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
private val collectorProvider = HvVesCollectorFactory(
configuration,
sinkProvider,
- metrics,
- MAX_PAYLOAD_SIZE_BYTES
+ metrics
)
val collector: Collector
@@ -64,18 +63,18 @@ class Sut(configuration: CollectorConfiguration, sink: Sink = StoringSink()) : C
fun handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
return sink.sentMessages
}
fun handleConnection(vararg packets: ByteBuf) {
- collector.handleConnection(Flux.fromArray(packets)).block(timeout)
+ collector.handleConnection(Flux.fromArray(packets)).block(TIMEOUT)
}
override fun close() = collectorProvider.close()
companion object {
- const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+ private val TIMEOUT = Duration.ofSeconds(10)
}
}
@@ -95,8 +94,6 @@ class DummySinkFactory(private val sink: Sink) : SinkFactory {
}
}
-private val timeout = Duration.ofSeconds(10)
-
fun vesHvWithAlwaysSuccessfulSink(routing: Routing = basicRouting): Sut =
Sut(CollectorConfiguration(routing), AlwaysSuccessfulSink())
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index d845f7c4..f90f4bc9 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -30,8 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.MEASUREMENT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.OTHER
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.tests.component.Sut.Companion.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.ALTERNATE_PERF3GPP_TOPIC
+import org.onap.dcae.collectors.veshv.tests.fakes.MAX_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.tests.fakes.PERF3GPP_TOPIC
import org.onap.dcae.collectors.veshv.tests.fakes.StoringSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicRouting
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
index 8956e81f..53e48121 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/routing.kt
@@ -27,7 +27,7 @@ import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableKafkaSink
const val PERF3GPP_TOPIC = "HV_VES_PERF3GPP"
const val ALTERNATE_PERF3GPP_TOPIC = "HV_VES_PERF3GPP_ALTERNATIVE"
const val KAFKA_BOOTSTRAP_SERVERS = "kafka:9092"
-private const val MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
+const val MAX_PAYLOAD_SIZE_BYTES = 512 * 512
private val perf3gppKafkaSink = ImmutableKafkaSink.builder()
.name("PERF3GPP")
diff --git a/sources/hv-collector-main/src/main/docker/base.json b/sources/hv-collector-main/src/main/docker/base.json
index 0a5cae07..e302da9e 100644
--- a/sources/hv-collector-main/src/main/docker/base.json
+++ b/sources/hv-collector-main/src/main/docker/base.json
@@ -2,7 +2,6 @@
"logLevel": "INFO",
"server.listenPort": 6061,
"server.idleTimeoutSec": 60,
- "server.maxPayloadSizeBytes": 1048576,
"cbs.firstRequestDelaySec": 10,
"cbs.requestIntervalSec": 5,
"security.sslDisable": true
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index a34b7118..98a094b2 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -58,8 +58,7 @@ object VesServer {
HvVesCollectorFactory(
config.collector,
AdapterFactory.sinkCreatorFactory(),
- MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes
+ MicrometerMetrics.INSTANCE
)
private fun logServerStarted(handle: ServerHandle) =