aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt5
-rw-r--r--hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt1
-rw-r--r--hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt2
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt4
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt12
-rw-r--r--hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt2
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt4
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt6
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt1
-rw-r--r--hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt7
-rw-r--r--hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt2
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt9
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt5
-rw-r--r--hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt2
-rw-r--r--hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt10
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt6
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt1
-rw-r--r--hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt3
-rw-r--r--hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt5
-rw-r--r--hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt6
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt33
-rw-r--r--hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt7
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt)21
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt)3
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt)2
-rw-r--r--hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt (renamed from hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt)2
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt4
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt5
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt6
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt1
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt2
34 files changed, 137 insertions, 61 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index d807a9e7..5c96e1c5 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference
class CollectorFactory(val configuration: ConfigurationProvider,
private val sinkProvider: SinkProvider,
private val metrics: Metrics,
+ private val maximumPayloadSizeBytes: Int,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
@@ -63,7 +64,9 @@ class CollectorFactory(val configuration: ConfigurationProvider,
private fun createVesHvCollector(config: CollectorConfiguration): Collector {
return VesHvCollector(
- wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) },
+ wireChunkDecoderSupplier = { alloc ->
+ WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc)
+ },
protobufDecoder = VesDecoder(),
router = Router(config.routing),
sink = sinkProvider(config),
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
index c14d36e6..7a7d9342 100644
--- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
+++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt
@@ -32,4 +32,5 @@ data class ServerConfiguration(
val securityConfiguration: SecurityConfiguration,
val idleTimeout: Duration,
val healthCheckApiPort: Int,
+ val maximumPayloadSizeBytes: Int,
val dummyMode: Boolean = false)
diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
index d214ffcf..f06a0dc7 100644
--- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
+++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt
@@ -45,7 +45,7 @@ internal object WireChunkDecoderTest : Spek({
fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame))
- fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc)
+ fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc)
fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) {
for (bb in byteBuffers) {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
index 67291abd..0c78dd5c 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt
@@ -31,12 +31,14 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink
import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import reactor.core.publisher.Flux
import reactor.math.sum
import java.security.MessageDigest
@@ -173,7 +175,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt
private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> =
WireFrameEncoder(alloc).let { encoder ->
- MessageGenerator.INSTANCE
+ MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES)
.createMessageFlux(listOf(params))
.map(encoder::encode)
.transform { simulateRemoteTcp(alloc, 1000, it) }
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
index 942e6edf..0495ced5 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt
@@ -45,11 +45,21 @@ class Sut(sink: Sink = StoringSink()) {
val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT
private val metrics = FakeMetrics()
- private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider)
+ private val collectorFactory = CollectorFactory(
+ configurationProvider,
+ SinkProvider.just(sink),
+ metrics,
+ MAX_PAYLOAD_SIZE_BYTES,
+ healthStateProvider)
private val collectorProvider = collectorFactory.createVesHvCollectorProvider()
val collector: Collector
get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") }
+
+ companion object {
+ const val MAX_PAYLOAD_SIZE_BYTES = 1024
+ }
+
}
fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> {
diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
index e9c0d67f..2d81c671 100644
--- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
+++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt
@@ -39,7 +39,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting
import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration
import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame
import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame
-import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload
+import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize
import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage
import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload
@@ -72,7 +72,7 @@ object VesHvSpecification : Spek({
val (sut, sink) = vesHvWithStoringSink()
val validMessage = vesWireFrameMessage(PERF3GPP)
val msgWithInvalidFrame = invalidWireFrame()
- val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP)
+ val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP)
val expectedRefCnt = 0
val handledEvents = sut.handleConnection(
@@ -329,7 +329,7 @@ object VesHvSpecification : Spek({
val handledMessages = sut.handleConnection(sink,
vesWireFrameMessage(PERF3GPP, "first"),
- vesMessageWithTooBigPayload(PERF3GPP),
+ vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP),
vesWireFrameMessage(PERF3GPP))
assertThat(handledMessages).hasSize(1)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
index 262e05bf..1a8af874 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt
@@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference
* @since August 2018
*/
class DcaeAppSimulator(private val consumerFactory: ConsumerFactory,
- private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) {
+ private val messageStreamValidation: MessageStreamValidation) {
private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference()
fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString))
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
index 38de5370..c910b53e 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt
@@ -35,8 +35,8 @@ import java.io.InputStream
import javax.json.Json
class MessageStreamValidation(
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> =
IO.monadError().bindingCatch {
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
index d0820616..83dceb6a 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt
@@ -26,17 +26,20 @@ import arrow.instances.extensions
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) {
override val cmdLineOptionsList: List<CommandLineOption> = listOf(
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
KAFKA_SERVERS,
KAFKA_TOPICS
)
@@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
val listenPort = cmdLine
.intValue(LISTEN_PORT)
.bind()
+ val maxPayloadSizeBytes = cmdLine
+ .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
val kafkaBootstrapServers = cmdLine
.stringValue(KAFKA_SERVERS)
.bind()
@@ -57,6 +62,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration
DcaeAppSimConfiguration(
listenPort,
+ maxPayloadSizeBytes,
kafkaBootstrapServers,
kafkaTopics)
}.fix()
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
index c114313d..a6fc8053 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt
@@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config
data class DcaeAppSimConfiguration(
val apiPort: Int,
+ val maxPayloadSizeBytes: Int,
val kafkaBootstrapServers: String,
val kafkaTopics: Set<String>
)
diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
index c0f8b340..06ff4d59 100644
--- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
+++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt
@@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator
+import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation
import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp"
private val logger = Logger(PACKAGE_NAME)
@@ -51,7 +53,10 @@ fun main(args: Array<String>) =
private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> {
- return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers)))
+ logger.info("Using configuration: $config")
+ val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers)
+ val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation))
.start(config.apiPort, config.kafkaTopics)
.unit()
}
diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
index 05fdd808..5e3090af 100644
--- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
+++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt
@@ -53,7 +53,7 @@ internal class MessageStreamValidationTest : Spek({
beforeEachTest {
messageParametersParser = mock()
messageGenerator = mock()
- cut = MessageStreamValidation(messageParametersParser, messageGenerator)
+ cut = MessageStreamValidation(messageGenerator, messageParametersParser)
}
fun givenParsedMessageParameters(vararg params: MessageParameters) {
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
index 4f867f13..f08ec178 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt
@@ -24,7 +24,6 @@ import arrow.core.Left
import arrow.core.Right
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
/**
@@ -52,7 +51,7 @@ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocato
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
*/
-class WireFrameDecoder {
+class WireFrameDecoder(private val maxPayloadSizeBytes: Int) {
fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> =
when {
@@ -81,13 +80,13 @@ class WireFrameDecoder {
private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> {
val versionMajor = byteBuf.readUnsignedByte()
val versionMinor = byteBuf.readUnsignedByte()
- byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved
+ byteBuf.skipBytes(RESERVED_BYTE_COUNT)
val payloadTypeRaw = byteBuf.readUnsignedByte()
val payloadSize = byteBuf.readInt()
- if (payloadSize > MAX_PAYLOAD_SIZE) {
+ if (payloadSize > maxPayloadSizeBytes) {
byteBuf.resetReaderIndex()
- return Left(PayloadSizeExceeded)
+ return Left(PayloadSizeExceeded(maxPayloadSizeBytes))
}
if (byteBuf.readableBytes() < payloadSize) {
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
index dfadc5b8..0d55cebb 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt
@@ -19,8 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.domain
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
-
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
* @since June 2018
@@ -38,7 +36,8 @@ class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame(
.format(WireFrameMessage.MARKER_BYTE, actualMarker)
)
-object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)")
+class PayloadSizeExceeded(maxPayloadSizeBytes: Int) :
+ InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)")
// Missing bytes errors
diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
index 036888e1..5fdc5afe 100644
--- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
+++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt
@@ -70,6 +70,6 @@ data class WireFrameMessage(val payload: ByteData,
RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes
1 * java.lang.Integer.BYTES // payload length
- const val MAX_PAYLOAD_SIZE = 1024 * 1024
+ const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024
}
}
diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
index 6756bf82..f17a79ba 100644
--- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
+++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt
@@ -28,7 +28,6 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import java.nio.charset.Charset
import kotlin.test.assertTrue
import kotlin.test.fail
@@ -39,8 +38,9 @@ import kotlin.test.fail
*/
object WireFrameCodecsTest : Spek({
val payloadAsString = "coffeebabe"
+ val maxPayloadSizeBytes = 1024
val encoder = WireFrameEncoder()
- val decoder = WireFrameDecoder()
+ val decoder = WireFrameDecoder(maxPayloadSizeBytes)
fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset()))
@@ -223,7 +223,7 @@ object WireFrameCodecsTest : Spek({
it("should decode successfully when payload size is equal 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
@@ -237,7 +237,7 @@ object WireFrameCodecsTest : Spek({
it("should return error when payload exceeds 1 MiB") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE + 1)
+ val payload = ByteArray(maxPayloadSizeBytes + 1)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
@@ -253,7 +253,7 @@ object WireFrameCodecsTest : Spek({
it("should validate only first message") {
- val payload = ByteArray(MAX_PAYLOAD_SIZE)
+ val payload = ByteArray(maxPayloadSizeBytes)
val input = WireFrameMessage(
payload = ByteData(payload),
versionMajor = 1,
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
index d6ff9efa..826982d6 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt
@@ -27,6 +27,7 @@ import arrow.instances.extensions
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams
import org.onap.dcae.collectors.veshv.model.ServerConfiguration
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
@@ -40,6 +41,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_T
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -62,6 +64,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
TRUST_STORE_FILE,
TRUST_STORE_PASSWORD,
IDLE_TIMEOUT_SEC,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
DUMMY_MODE
)
@@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
)
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC)
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, DefaultValues.MAX_PAYLOAD_SIZE_BYTES)
val dummyMode = cmdLine.hasOption(DUMMY_MODE)
val security = createSecurityConfiguration(cmdLine).bind()
val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind()
@@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
configurationProviderParams = configurationProviderParams,
securityConfiguration = security,
idleTimeout = Duration.ofSeconds(idleTimeoutSec),
+ maximumPayloadSizeBytes = maxPayloadSizeBytes,
dummyMode = dummyMode)
}.fix()
@@ -110,5 +115,6 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration
const val CONSUL_FIRST_REQUEST_DELAY = 10L
const val CONSUL_REQUEST_INTERVAL = 5L
const val IDLE_TIMEOUT_SEC = 60L
+ const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES
}
}
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
index a84a39a5..78d42830 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt
@@ -48,6 +48,7 @@ fun main(args: Array<String>) =
private fun startAndAwaitServers(config: ServerConfiguration) =
IO.monad().binding {
+ logger.info("Using configuration: $config")
HealthCheckServer.start(config).bind()
VesServer.start(config).bind()
.await().bind()
diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index fbf8936f..992a4dc8 100644
--- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -40,7 +40,8 @@ object VesServer : ServerStarter() {
val collectorProvider = CollectorFactory(
AdapterFactory.consulConfigurationProvider(config.configurationProviderParams),
sink,
- MicrometerMetrics()
+ MicrometerMetrics(),
+ config.maximumPayloadSizeBytes
).createVesHvCollectorProvider()
return ServerFactory.createNettyTcpServer(config, collectorProvider)
diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
index 035d94ee..3bf615af 100644
--- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
+++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt
@@ -23,7 +23,6 @@ import com.google.protobuf.ByteString
import io.netty.buffer.ByteBuf
import io.netty.buffer.ByteBufAllocator
import io.netty.buffer.PooledByteBufAllocator
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT
import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
@@ -70,13 +69,13 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run {
writeByte(0x01) // version minor
}
-fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf =
+fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf =
allocator.buffer().run {
writeValidWireFrameHeaders()
val gpb = vesEvent(
domain = domain,
- eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE))
+ eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes))
).toByteString().asReadOnlyByteBuffer()
writeInt(gpb.limit()) // ves event size in bytes
diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
index 288a578d..9439bff5 100644
--- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
+++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt
@@ -119,6 +119,12 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false)
|connection might be closed.""".trimMargin())
.build()
),
+ MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m")
+ .longOpt("max-payload-size")
+ .hasArg()
+ .desc("Maximum supported payload size in bytes")
+ .build()
+ ),
DUMMY_MODE(Option.builder("u")
.longOpt("dummy")
.desc("If present will start in dummy mode (dummy external services)")
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
index ace7f1cb..076c06be 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt
@@ -20,8 +20,6 @@
package org.onap.dcae.collectors.veshv.ves.message.generator.api
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
import reactor.core.publisher.Flux
/**
@@ -32,10 +30,6 @@ interface MessageGenerator {
fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage>
companion object {
- val INSTANCE: MessageGenerator by lazy {
- MessageGeneratorImpl(PayloadGenerator())
- }
-
const val FIXED_PAYLOAD_SIZE = 100
}
}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
new file mode 100644
index 00000000..e2269c20
--- /dev/null
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt
@@ -0,0 +1,33 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.ves.message.generator.factory
+
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl
+import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator
+
+/**
+ * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
+ * @since October 2018
+ */
+object MessageGeneratorFactory {
+ fun create(maxPayloadSizeBytes: Int): MessageGenerator =
+ MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
+}
diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
index 2bb9f781..cc1d16fe 100644
--- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
+++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt
@@ -42,7 +42,10 @@ import java.nio.charset.Charset
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since June 2018
*/
-class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator {
+class MessageGeneratorImpl internal constructor(
+ private val payloadGenerator: PayloadGenerator,
+ private val maxPayloadSizeBytes: Int
+) : MessageGenerator {
override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux
.fromIterable(messageParameters)
@@ -89,7 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa
.build()
private fun oversizedPayload() =
- payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1)
+ payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1)
private fun fixedPayload() =
payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE)
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
index ec09ac9c..ee76b789 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import com.google.protobuf.ByteString
import com.google.protobuf.InvalidProtocolBufferException
@@ -47,7 +47,8 @@ import reactor.test.test
*/
object MessageGeneratorImplTest : Spek({
describe("message factory") {
- val generator = MessageGenerator.INSTANCE
+ val maxPayloadSizeBytes = 1024
+ val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes)
given("single message parameters") {
on("messages amount not specified in parameters") {
it("should create infinite flux") {
@@ -87,7 +88,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.verifyComplete()
@@ -105,7 +106,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.verifyComplete()
@@ -122,7 +123,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThatExceptionOfType(InvalidProtocolBufferException::class.java)
.isThrownBy { extractCommonEventHeader(it.payload) }
}
@@ -140,7 +141,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isFalse()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR)
}
@@ -158,7 +159,7 @@ object MessageGeneratorImplTest : Spek({
.test()
.assertNext {
assertThat(it.isValid()).isTrue()
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
@@ -177,17 +178,17 @@ object MessageGeneratorImplTest : Spek({
generator.createMessageFlux(messageParameters)
.test()
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName)
}
.expectNextCount(singleFluxSize - 1)
.assertNext {
- assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE)
+ assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes)
assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName)
}
.expectNextCount(singleFluxSize - 1)
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
index 3b1a48b3..134ebb2d 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.Assertions.fail
@@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType
-import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl
private const val EXPECTED_MESSAGES_AMOUNT = 25000L
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
index 2b41e290..5249a8d2 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.given
diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
index dd8bc89f..5d112bae 100644
--- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt
+++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt
@@ -17,7 +17,7 @@
* limitations under the License.
* ============LICENSE_END=========================================================
*/
-package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl
+package org.onap.dcae.collectors.veshv.ves.message.generator.impl
import javax.json.Json
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 3fde2c7e..ec5ef81a 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -39,8 +39,8 @@ import javax.json.Json
*/
class XnfSimulator(
private val vesClient: VesHvClient,
- private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE,
- private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) {
+ private val messageGenerator: MessageGenerator,
+ private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
index 54ead6f7..06f1cffe 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt
@@ -22,12 +22,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
import arrow.core.Either
import arrow.effects.IO
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.utils.http.Content
-import org.onap.dcae.collectors.veshv.utils.http.ContentType
import org.onap.dcae.collectors.veshv.utils.http.HttpConstants
-import org.onap.dcae.collectors.veshv.utils.http.HttpStatus
import org.onap.dcae.collectors.veshv.utils.http.Response
import org.onap.dcae.collectors.veshv.utils.http.Responses
import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors
@@ -40,7 +36,6 @@ import ratpack.http.TypedData
import ratpack.server.RatpackServer
import ratpack.server.ServerConfig
import java.util.*
-import javax.json.Json
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index 3d8dc948..7966a4e9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -25,11 +25,13 @@ import arrow.core.monad
import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
+import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
@@ -48,6 +50,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
VES_HV_PORT,
VES_HV_HOST,
LISTEN_PORT,
+ MAXIMUM_PAYLOAD_SIZE_BYTES,
SSL_DISABLE,
KEY_STORE_FILE,
KEY_STORE_PASSWORD,
@@ -59,12 +62,15 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
val listenPort = cmdLine.intValue(LISTEN_PORT).bind()
val vesHost = cmdLine.stringValue(VES_HV_HOST).bind()
val vesPort = cmdLine.intValue(VES_HV_PORT).bind()
+ val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
SimulatorConfiguration(
listenPort,
vesHost,
vesPort,
+ maxPayloadSizeBytes,
createSecurityConfiguration(cmdLine).bind())
}.fix()
+
}
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 9b6ef209..3395d282 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -29,4 +29,5 @@ data class SimulatorConfiguration(
val listenPort: Int,
val vesHost: String,
val vesPort: Int,
+ val maxPayloadSizeBytes: Int,
val security: SecurityConfiguration)
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index c9e900ac..4512dfbf 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.arrow.unit
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf"
private val logger = Logger(PACKAGE_NAME)
@@ -41,7 +42,11 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt"
fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args)
.mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME))
.map { config ->
- XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations())
+ logger.info("Using configuration: $config")
+ val xnfSimulator = XnfSimulator(
+ VesHvClient(config),
+ MessageGeneratorFactory.create(config.maxPayloadSizeBytes))
+ XnfApiServer(xnfSimulator, OngoingSimulations())
.start(config.listenPort)
.unit()
}
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 97535887..2a78ed5e 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -54,7 +54,7 @@ internal class XnfSimulatorTest : Spek({
vesClient = mock()
messageParametersParser = mock()
messageGenerator = mock()
- cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator)
+ cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser)
}
describe("startSimulation") {