summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-10-05 09:02:06 +0200
committerPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2018-10-05 09:02:06 +0200
commitffe57b5673af80942925eed5b8e793ce2cf750b1 (patch)
tree31c7ee54da63a127da20662c29a58147c5c4f2ce
parent7b178765c00751fd99399eec76f1006270ae41dd (diff)
Introduce configurable payload size limitation
Maximum payload size will be configurable (from command line parameter or environment variable). The default value is same as previous hardcoded value, ie. 1 MiB = 1024 * 1024 bytes. Change-Id: Iec83d8295252bac353d3794b13454fdbbc80ecc4 Issue-ID: DCAEGEN2-856 Signed-off-by: Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt2
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt12
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt4
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt1
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt7
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt2
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt9
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt5
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt2
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt10
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt6
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt1
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt5
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt33
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt7
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt)21
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt)3
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt)2
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt)2
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt5
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt6
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt1
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt2
34 files changed, 137 insertions, 61 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index d807a9e7..5c96e1c5 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
+ private val maximumPayloadSizeBytes: Int,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
@@ -63,7 +64,9 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+ wireChunkDecoderSupplier = { alloc ->
+ WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
+ },
protobufDecoder = VesDecoder(),
router = Router(config.routing),
sink = sinkProvider(config),
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index c14d36e6..7a7d9342 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -32,4 +32,5 @@ data class ServerConfiguration(
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val healthCheckApiPort: Int,
+ val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index d214ffcf..f06a0dc7 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -45,7 +45,7 @@ internal object WireChunkDecoderTest : Spek({
fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 67291abd..0c78dd5c 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -31,12 +31,14 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
@@ -173,7 +175,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
WireFrameEncoder(alloc).let { encoder ->
- MessageGenerator.INSTANCE
+ MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
.createMessageFlux(listOf(params))
.map(encoder::encode)
.transform { simulateRemoteTcp(alloc, 1000, it) }
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 942e6edf..0495ced5 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -45,11 +45,21 @@ class Sut(sink: Sink = StoringSink()) {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
+ private val collectorFactory = CollectorFactory(
+ configurationProvider,
+ SinkProvider.just(sink),
+ metrics,
+ MAX_PAYLOAD_SIZE_BYTES,
+ healthStateProvider)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+
+ companion object {
+ const val MAX_PAYLOAD_SIZE_BYTES = 1024
+ }
+
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index e9c0d67f..2d81c671 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -39,7 +39,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
@@ -72,7 +72,7 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+ val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
@@ -329,7 +329,7 @@ object VesHvSpecification : Spek({
val handledMessages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP, "first"),
- vesMessageWithTooBigPayload(PERF3GPP),
+ vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 262e05bf..1a8af874 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference
* @since August 2018
*/
class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+ private val messageStreamValidation: MessageStreamValidation) {
private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 38de5370..c910b53e 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -35,8 +35,8 @@ import java.io.InputStream
import javax.json.Json
class MessageStreamValidation(
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index d0820616..83dceb6a 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -26,17 +26,20 @@ import arrow.instances.extensions
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
KAFKA_SERVERS,
KAFKA_TOPICS
)
@@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
val listenPort = cmdLine
.intValue(LISTEN_PORT)
.bind()
+ val maxPayloadSizeBytes = cmdLine
+ .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
val kafkaBootstrapServers = cmdLine
.stringValue(KAFKA_SERVERS)
.bind()
@@ -57,6 +62,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
DcaeAppSimConfiguration(
listenPort,
+ maxPayloadSizeBytes,
kafkaBootstrapServers,
kafkaTopics)
}.fix()
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
index c114313d..a6fc8053 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
data class DcaeAppSimConfiguration(
val apiPort: Int,
+ val maxPayloadSizeBytes: Int,
val kafkaBootstrapServers: String,
val kafkaTopics: Set<String>
)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index c0f8b340..06ff4d59 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
@@ -51,7 +53,10 @@ fun main(args: Array<String>) =
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+ logger.info("Using configuration: $config")
+ val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+ val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiPort, config.kafkaTopics)
.unit()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 05fdd808..5e3090af 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -53,7 +53,7 @@ internal class MessageStreamValidationTest : Spek({
beforeEachTest {
messageParametersParser = mock()
messageGenerator = mock()
- cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+ cut = MessageStreamValidation(messageGenerator, messageParametersParser)
}
fun givenParsedMessageParameters(vararg params: MessageParameters) {
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 4f867f13..f08ec178 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,7 +24,6 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
@@ -52,7 +51,7 @@ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocato
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameDecoder {
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
@@ -81,13 +80,13 @@ class WireFrameDecoder {
private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
- byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
+ byteBuf.skipBytes(RESERVED_BYTE_COUNT)
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
- if (payloadSize > MAX_PAYLOAD_SIZE) {
+ if (payloadSize > maxPayloadSizeBytes) {
byteBuf.resetReaderIndex()
- return Left(PayloadSizeExceeded)
+ return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
}
if (byteBuf.readableBytes() < payloadSize) {
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index dfadc5b8..0d55cebb 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,8 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
@@ -38,7 +36,8 @@ class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
.format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
-object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
+class PayloadSizeExceeded(maxPayloadSizeBytes: Int) :
+ InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)")
// Missing bytes errors
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 036888e1..5fdc5afe 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -70,6 +70,6 @@ data class WireFrameMessage(val payload: ByteData,
RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes
1 * java.lang.Integer.BYTES // payload length
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
+ const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
}
}
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 6756bf82..f17a79ba 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -28,7 +28,6 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -39,8 +38,9 @@ import kotlin.test.fail
*/
object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
+ val maxPayloadSizeBytes = 1024
val encoder = WireFrameEncoder()
- val decoder = WireFrameDecoder()
+ val decoder = WireFrameDecoder(maxPayloadSizeBytes)
fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
@@ -223,7 +223,7 @@ object WireFrameCodecsTest : Spek({
it("should decode successfully when payload size is equal 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
@@ -237,7 +237,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+ val payload = ByteArray(maxPayloadSizeBytes + 1)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
@@ -253,7 +253,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index d6ff9efa..826982d6 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,6 +27,7 @@ import arrow.instances.extensions
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
@@ -40,6 +41,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_T
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -62,6 +64,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
TRUST_STORE_FILE,
TRUST_STORE_PASSWORD,
IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
DUMMY_MODE
)
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
)
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine).bind()
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
dummyMode = dummyMode)
}.fix()
@@ -110,5 +115,6 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
const val CONSUL_FIRST_REQUEST_DELAY = 10L
const val CONSUL_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
+ const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
}
}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index a84a39a5..78d42830 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -48,6 +48,7 @@ fun main(args: Array<String>) =
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
+ logger.info("Using configuration: $config")
HealthCheckServer.start(config).bind()
VesServer.start(config).bind()
.await().bind()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index fbf8936f..992a4dc8 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -40,7 +40,8 @@ object VesServer : ServerStarter() {
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
sink,
- MicrometerMetrics()
+ MicrometerMetrics(),
+ config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
return ServerFactory.createNettyTcpServer(config, collectorProvider)
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index 035d94ee..3bf615af 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -23,7 +23,6 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
@@ -70,13 +69,13 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // version minor
}
-fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
val gpb = vesEvent(
domain = domain,
- eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
).toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 288a578d..9439bff5 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -119,6 +119,12 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
|connection might be closed.""".trimMargin())
.build()
),
+ MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m")
+ .longOpt("max-payload-size")
+ .hasArg()
+ .desc("Maximum supported payload size in bytes")
+ .build()
+ ),
DUMMY_MODE(Option.builder("u")
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index ace7f1cb..076c06be 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -20,8 +20,6 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.api
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import reactor.core.publisher.Flux
/**
@@ -32,10 +30,6 @@ interface MessageGenerator {
fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
companion object {
- val INSTANCE: MessageGenerator by lazy {
- MessageGeneratorImpl(PayloadGenerator())
- }
-
const val FIXED_PAYLOAD_SIZE = 100
}
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
new file mode 100644
index 00000000..e2269c20
--- /dev/null
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.factory
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since October 2018
+ */
+object MessageGeneratorFactory {
+ fun create(maxPayloadSizeBytes: Int): MessageGenerator =
+ MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index 2bb9f781..cc1d16fe 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -42,7 +42,10 @@ import java.nio.charset.Charset
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
+class MessageGeneratorImpl internal constructor(
+ private val payloadGenerator: PayloadGenerator,
+ private val maxPayloadSizeBytes: Int
+) : MessageGenerator {
override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
.fromIterable(messageParameters)
@@ -89,7 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
.build()
private fun oversizedPayload() =
- payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+ payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
private fun fixedPayload() =
payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index ec09ac9c..ee76b789 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
@@ -47,7 +47,8 @@ import reactor.test.test
*/
object MessageGeneratorImplTest : Spek({
describe("message factory") {
- val generator = MessageGenerator.INSTANCE
+ val maxPayloadSizeBytes = 1024
+ val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
given("single message parameters") {
on("messages amount not specified in parameters") {
it("should create infinite flux") {
@@ -87,7 +88,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.verifyComplete()
@@ -105,7 +106,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.verifyComplete()
@@ -122,7 +123,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
.isThrownBy { extractCommonEventHeader(it.payload) }
}
@@ -140,7 +141,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isFalse()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
}
@@ -158,7 +159,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
@@ -177,17 +178,17 @@ object MessageGeneratorImplTest : Spek({
generator.createMessageFlux(messageParameters)
.test()
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
}
.expectNextCount(singleFluxSize - 1)
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
index 3b1a48b3..134ebb2d 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
@@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
private const val EXPECTED_MESSAGES_AMOUNT = 25000L
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
index 2b41e290..5249a8d2 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
index dd8bc89f..5d112bae 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import javax.json.Json
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 3fde2c7e..ec5ef81a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -39,8 +39,8 @@ import javax.json.Json
*/
class XnfSimulator(
private val vesClient: VesHvClient,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
index 54ead6f7..06f1cffe 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -22,12 +22,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
import arrow.core.Either
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.utils.http.Content
-import org.onap.dcae.collectors.veshv.utils.http.ContentType
import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
@@ -40,7 +36,6 @@ import ratpack.http.TypedData
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
import java.util.*
-import javax.json.Json
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 3d8dc948..7966a4e9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -25,11 +25,13 @@ import arrow.core.monad
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -48,6 +50,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
VES_HV_PORT,
VES_HV_HOST,
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
SSL_DISABLE,
KEY_STORE_FILE,
KEY_STORE_PASSWORD,
@@ -59,12 +62,15 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
SimulatorConfiguration(
listenPort,
vesHost,
vesPort,
+ maxPayloadSizeBytes,
createSecurityConfiguration(cmdLine).bind())
}.fix()
+
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 9b6ef209..3395d282 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -29,4 +29,5 @@ data class SimulatorConfiguration(
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
+ val maxPayloadSizeBytes: Int,
val security: SecurityConfiguration)
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index c9e900ac..4512dfbf 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
private val logger = Logger(PACKAGE_NAME)
@@ -41,7 +42,11 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
.map { config ->
- XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
+ logger.info("Using configuration: $config")
+ val xnfSimulator = XnfSimulator(
+ VesHvClient(config),
+ MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ XnfApiServer(xnfSimulator, OngoingSimulations())
.start(config.listenPort)
.unit()
}
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 97535887..2a78ed5e 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -54,7 +54,7 @@ internal class XnfSimulatorTest : Spek({
vesClient = mock()
messageParametersParser = mock()
messageGenerator = mock()
- cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser)
}
describe("startSimulation") {