From ffe57b5673af80942925eed5b8e793ce2cf750b1 Mon Sep 17 00:00:00 2001 From: Piotr Jaszczyk Date: Fri, 5 Oct 2018 09:02:06 +0200 Subject: Introduce configurable payload size limitation Maximum payload size will be configurable (from command line parameter or environment variable). The default value is same as previous hardcoded value, ie. 1 MiB = 1024 * 1024 bytes. Change-Id: Iec83d8295252bac353d3794b13454fdbbc80ecc4 Issue-ID: DCAEGEN2-856 Signed-off-by: Piotr Jaszczyk --- .../collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt | 2 +- .../veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt | 4 ++-- .../simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt | 6 ++++++ .../simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt | 1 + .../org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt | 7 ++++++- 5 files changed, 16 insertions(+), 4 deletions(-) (limited to 'hv-collector-dcae-app-simulator/src/main') diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 262e05bf..1a8af874 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference * @since August 2018 */ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val messageStreamValidation: MessageStreamValidation) { private val consumerState: AtomicReference = AtomicReference() fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 38de5370..c910b53e 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -35,8 +35,8 @@ import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, - private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List): IO = IO.monadError().bindingCatch { diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index d0820616..83dceb6a 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -26,17 +26,20 @@ import arrow.instances.extensions import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue class ArgDcaeAppSimConfiguration : ArgBasedConfiguration(DefaultParser()) { override val cmdLineOptionsList: List = listOf( LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, KAFKA_SERVERS, KAFKA_TOPICS ) @@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration ) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index c0f8b340..06ff4d59 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" private val logger = Logger(PACKAGE_NAME) @@ -51,7 +53,10 @@ fun main(args: Array) = private fun startApp(config: DcaeAppSimConfiguration): IO { - return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) + logger.info("Using configuration: $config") + val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiPort, config.kafkaTopics) .unit() } -- cgit 1.2.3-korg