diff options
author | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-10-05 09:02:06 +0200 |
---|---|---|
committer | Piotr Jaszczyk <piotr.jaszczyk@nokia.com> | 2018-10-05 09:02:06 +0200 |
commit | ffe57b5673af80942925eed5b8e793ce2cf750b1 (patch) | |
tree | 31c7ee54da63a127da20662c29a58147c5c4f2ce /hv-collector-dcae-app-simulator/src | |
parent | 7b178765c00751fd99399eec76f1006270ae41dd (diff) |
Introduce configurable payload size limitation
Maximum payload size will be configurable (from command line parameter
or environment variable). The default value is same as previous
hardcoded value, ie. 1 MiB = 1024 * 1024 bytes.
Change-Id: Iec83d8295252bac353d3794b13454fdbbc80ecc4
Issue-ID: DCAEGEN2-856
Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
Diffstat (limited to 'hv-collector-dcae-app-simulator/src')
6 files changed, 17 insertions, 5 deletions
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 262e05bf..1a8af874 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference * @since August 2018 */ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val messageStreamValidation: MessageStreamValidation) { private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 38de5370..c910b53e 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -35,8 +35,8 @@ import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, - private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = IO.monadError().bindingCatch { diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index d0820616..83dceb6a 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -26,17 +26,20 @@ import arrow.instances.extensions import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { override val cmdLineOptionsList: List<CommandLineOption> = listOf( LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, KAFKA_SERVERS, KAFKA_TOPICS ) @@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration val listenPort = cmdLine .intValue(LISTEN_PORT) .bind() + val maxPayloadSizeBytes = cmdLine + .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) val kafkaBootstrapServers = cmdLine .stringValue(KAFKA_SERVERS) .bind() @@ -57,6 +62,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration DcaeAppSimConfiguration( listenPort, + maxPayloadSizeBytes, kafkaBootstrapServers, kafkaTopics) }.fix() diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt index c114313d..a6fc8053 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config data class DcaeAppSimConfiguration( val apiPort: Int, + val maxPayloadSizeBytes: Int, val kafkaBootstrapServers: String, val kafkaTopics: Set<String> ) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index c0f8b340..06ff4d59 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" private val logger = Logger(PACKAGE_NAME) @@ -51,7 +53,10 @@ fun main(args: Array<String>) = private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) + logger.info("Using configuration: $config") + val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiPort, config.kafkaTopics) .unit() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 05fdd808..5e3090af 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -53,7 +53,7 @@ internal class MessageStreamValidationTest : Spek({ beforeEachTest { messageParametersParser = mock() messageGenerator = mock() - cut = MessageStreamValidation(messageParametersParser, messageGenerator) + cut = MessageStreamValidation(messageGenerator, messageParametersParser) } fun givenParsedMessageParameters(vararg params: MessageParameters) { |