diff options
34 files changed, 137 insertions, 61 deletions
diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt index d807a9e7..5c96e1c5 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt @@ -43,6 +43,7 @@ import java.util.concurrent.atomic.AtomicReference class CollectorFactory(val configuration: ConfigurationProvider, private val sinkProvider: SinkProvider, private val metrics: Metrics, + private val maximumPayloadSizeBytes: Int, private val healthState: HealthState = HealthState.INSTANCE) { fun createVesHvCollectorProvider(): CollectorProvider { @@ -63,7 +64,9 @@ class CollectorFactory(val configuration: ConfigurationProvider, private fun createVesHvCollector(config: CollectorConfiguration): Collector { return VesHvCollector( - wireChunkDecoderSupplier = { alloc -> WireChunkDecoder(WireFrameDecoder(), alloc) }, + wireChunkDecoderSupplier = { alloc -> + WireChunkDecoder(WireFrameDecoder(maximumPayloadSizeBytes), alloc) + }, protobufDecoder = VesDecoder(), router = Router(config.routing), sink = sinkProvider(config), diff --git a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt index c14d36e6..7a7d9342 100644 --- a/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt +++ b/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/model/ServerConfiguration.kt @@ -32,4 +32,5 @@ data class ServerConfiguration( val securityConfiguration: SecurityConfiguration, val idleTimeout: Duration, val healthCheckApiPort: Int, + val maximumPayloadSizeBytes: Int, val dummyMode: Boolean = false) diff --git a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt index d214ffcf..f06a0dc7 100644 --- a/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt +++ b/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/wire/WireChunkDecoderTest.kt @@ -45,7 +45,7 @@ internal object WireChunkDecoderTest : Spek({ fun WireChunkDecoder.decode(frame: WireFrameMessage) = decode(encoder.encode(frame)) - fun createInstance() = WireChunkDecoder(WireFrameDecoder(), alloc) + fun createInstance() = WireChunkDecoder(WireFrameDecoder(WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES), alloc) fun verifyMemoryReleased(vararg byteBuffers: ByteBuf) { for (bb in byteBuffers) { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt index 67291abd..0c78dd5c 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/PerformanceSpecification.kt @@ -31,12 +31,14 @@ import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.tests.fakes.CountingSink import org.onap.dcae.collectors.veshv.tests.fakes.basicConfiguration import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType.VALID +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory import reactor.core.publisher.Flux import reactor.math.sum import java.security.MessageDigest @@ -173,7 +175,7 @@ fun dropWhenIndex(predicate: (Long) -> Boolean, stream: Flux<ByteBuf>): Flux<Byt private fun generateDataStream(alloc: ByteBufAllocator, params: MessageParameters): Flux<ByteBuf> = WireFrameEncoder(alloc).let { encoder -> - MessageGenerator.INSTANCE + MessageGeneratorFactory.create(Sut.MAX_PAYLOAD_SIZE_BYTES) .createMessageFlux(listOf(params)) .map(encoder::encode) .transform { simulateRemoteTcp(alloc, 1000, it) } diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt index 942e6edf..0495ced5 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/Sut.kt @@ -45,11 +45,21 @@ class Sut(sink: Sink = StoringSink()) { val alloc: ByteBufAllocator = UnpooledByteBufAllocator.DEFAULT private val metrics = FakeMetrics() - private val collectorFactory = CollectorFactory(configurationProvider, SinkProvider.just(sink), metrics, healthStateProvider) + private val collectorFactory = CollectorFactory( + configurationProvider, + SinkProvider.just(sink), + metrics, + MAX_PAYLOAD_SIZE_BYTES, + healthStateProvider) private val collectorProvider = collectorFactory.createVesHvCollectorProvider() val collector: Collector get() = collectorProvider().getOrElse{ throw IllegalStateException("Collector not available.") } + + companion object { + const val MAX_PAYLOAD_SIZE_BYTES = 1024 + } + } fun Sut.handleConnection(sink: StoringSink, vararg packets: ByteBuf): List<RoutedMessage> { diff --git a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt index e9c0d67f..2d81c671 100644 --- a/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt +++ b/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/component/VesHvSpecification.kt @@ -39,7 +39,7 @@ import org.onap.dcae.collectors.veshv.tests.fakes.configurationWithoutRouting import org.onap.dcae.collectors.veshv.tests.fakes.twoDomainsToOneTopicConfiguration import org.onap.dcae.collectors.veshv.tests.utils.garbageFrame import org.onap.dcae.collectors.veshv.tests.utils.invalidWireFrame -import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithTooBigPayload +import org.onap.dcae.collectors.veshv.tests.utils.vesMessageWithPayloadOfSize import org.onap.dcae.collectors.veshv.tests.utils.vesWireFrameMessage import org.onap.dcae.collectors.veshv.tests.utils.wireFrameMessageWithInvalidPayload @@ -72,7 +72,7 @@ object VesHvSpecification : Spek({ val (sut, sink) = vesHvWithStoringSink() val validMessage = vesWireFrameMessage(PERF3GPP) val msgWithInvalidFrame = invalidWireFrame() - val msgWithTooBigPayload = vesMessageWithTooBigPayload(PERF3GPP) + val msgWithTooBigPayload = vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP) val expectedRefCnt = 0 val handledEvents = sut.handleConnection( @@ -329,7 +329,7 @@ object VesHvSpecification : Spek({ val handledMessages = sut.handleConnection(sink, vesWireFrameMessage(PERF3GPP, "first"), - vesMessageWithTooBigPayload(PERF3GPP), + vesMessageWithPayloadOfSize(Sut.MAX_PAYLOAD_SIZE_BYTES + 1, PERF3GPP), vesWireFrameMessage(PERF3GPP)) assertThat(handledMessages).hasSize(1) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt index 262e05bf..1a8af874 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulator.kt @@ -34,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference * @since August 2018 */ class DcaeAppSimulator(private val consumerFactory: ConsumerFactory, - private val messageStreamValidation: MessageStreamValidation = MessageStreamValidation()) { + private val messageStreamValidation: MessageStreamValidation) { private val consumerState: AtomicReference<ConsumerStateProvider> = AtomicReference() fun listenToTopics(topicsString: String) = listenToTopics(extractTopics(topicsString)) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt index 38de5370..c910b53e 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidation.kt @@ -35,8 +35,8 @@ import java.io.InputStream import javax.json.Json class MessageStreamValidation( - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, - private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun validate(jsonDescription: InputStream, consumedMessages: List<ByteArray>): IO<Boolean> = IO.monadError().bindingCatch { diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt index d0820616..83dceb6a 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/ArgDcaeAppSimConfiguration.kt @@ -26,17 +26,20 @@ import arrow.instances.extensions import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_SERVERS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KAFKA_TOPICS import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration>(DefaultParser()) { override val cmdLineOptionsList: List<CommandLineOption> = listOf( LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, KAFKA_SERVERS, KAFKA_TOPICS ) @@ -47,6 +50,8 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration val listenPort = cmdLine .intValue(LISTEN_PORT) .bind() + val maxPayloadSizeBytes = cmdLine + .intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) val kafkaBootstrapServers = cmdLine .stringValue(KAFKA_SERVERS) .bind() @@ -57,6 +62,7 @@ class ArgDcaeAppSimConfiguration : ArgBasedConfiguration<DcaeAppSimConfiguration DcaeAppSimConfiguration( listenPort, + maxPayloadSizeBytes, kafkaBootstrapServers, kafkaTopics) }.fix() diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt index c114313d..a6fc8053 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/config/DcaeAppSimConfiguration.kt @@ -21,6 +21,7 @@ package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config data class DcaeAppSimConfiguration( val apiPort: Int, + val maxPayloadSizeBytes: Int, val kafkaBootstrapServers: String, val kafkaTopics: Set<String> ) diff --git a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt index c0f8b340..06ff4d59 100644 --- a/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt +++ b/hv-collector-dcae-app-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/main.kt @@ -24,12 +24,14 @@ import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.ArgDcaeAppS import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.config.DcaeAppSimConfiguration import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.ConsumerFactory import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.DcaeAppSimulator +import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.MessageStreamValidation import org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl.adapters.DcaeAppApiServer import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.dcaeapp" private val logger = Logger(PACKAGE_NAME) @@ -51,7 +53,10 @@ fun main(args: Array<String>) = private fun startApp(config: DcaeAppSimConfiguration): IO<Unit> { - return DcaeAppApiServer(DcaeAppSimulator(ConsumerFactory(config.kafkaBootstrapServers))) + logger.info("Using configuration: $config") + val consumerFactory = ConsumerFactory(config.kafkaBootstrapServers) + val messageStreamValidation = MessageStreamValidation(MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + return DcaeAppApiServer(DcaeAppSimulator(consumerFactory, messageStreamValidation)) .start(config.apiPort, config.kafkaTopics) .unit() } diff --git a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt index 05fdd808..5e3090af 100644 --- a/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt +++ b/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/MessageStreamValidationTest.kt @@ -53,7 +53,7 @@ internal class MessageStreamValidationTest : Spek({ beforeEachTest { messageParametersParser = mock() messageGenerator = mock() - cut = MessageStreamValidation(messageParametersParser, messageGenerator) + cut = MessageStreamValidation(messageGenerator, messageParametersParser) } fun givenParsedMessageParameters(vararg params: MessageParameters) { diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt index 4f867f13..f08ec178 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/codec.kt @@ -24,7 +24,6 @@ import arrow.core.Left import arrow.core.Right import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT /** @@ -52,7 +51,7 @@ class WireFrameEncoder(private val allocator: ByteBufAllocator = ByteBufAllocato * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 */ -class WireFrameDecoder { +class WireFrameDecoder(private val maxPayloadSizeBytes: Int) { fun decodeFirst(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> = when { @@ -81,13 +80,13 @@ class WireFrameDecoder { private fun parsePayloadFrame(byteBuf: ByteBuf): Either<WireFrameDecodingError, WireFrameMessage> { val versionMajor = byteBuf.readUnsignedByte() val versionMinor = byteBuf.readUnsignedByte() - byteBuf.skipBytes(RESERVED_BYTE_COUNT) // reserved + byteBuf.skipBytes(RESERVED_BYTE_COUNT) val payloadTypeRaw = byteBuf.readUnsignedByte() val payloadSize = byteBuf.readInt() - if (payloadSize > MAX_PAYLOAD_SIZE) { + if (payloadSize > maxPayloadSizeBytes) { byteBuf.resetReaderIndex() - return Left(PayloadSizeExceeded) + return Left(PayloadSizeExceeded(maxPayloadSizeBytes)) } if (byteBuf.readableBytes() < payloadSize) { diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt index dfadc5b8..0d55cebb 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/errors.kt @@ -19,8 +19,6 @@ */ package org.onap.dcae.collectors.veshv.domain -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE - /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> * @since June 2018 @@ -38,7 +36,8 @@ class InvalidWireFrameMarker(actualMarker: Short) : InvalidWireFrame( .format(WireFrameMessage.MARKER_BYTE, actualMarker) ) -object PayloadSizeExceeded : InvalidWireFrame("payload size exceeds the limit ($MAX_PAYLOAD_SIZE bytes)") +class PayloadSizeExceeded(maxPayloadSizeBytes: Int) : + InvalidWireFrame("payload size exceeds the limit ($maxPayloadSizeBytes bytes)") // Missing bytes errors diff --git a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt index 036888e1..5fdc5afe 100644 --- a/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt +++ b/hv-collector-domain/src/main/kotlin/org/onap/dcae/collectors/veshv/domain/wire_frame.kt @@ -70,6 +70,6 @@ data class WireFrameMessage(val payload: ByteData, RESERVED_BYTE_COUNT * java.lang.Byte.BYTES + // reserved bytes 1 * java.lang.Integer.BYTES // payload length - const val MAX_PAYLOAD_SIZE = 1024 * 1024 + const val DEFAULT_MAX_PAYLOAD_SIZE_BYTES = 1024 * 1024 } } diff --git a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt index 6756bf82..f17a79ba 100644 --- a/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt +++ b/hv-collector-domain/src/test/kotlin/org/onap/dcae/collectors/veshv/domain/WireFrameCodecsTest.kt @@ -28,7 +28,6 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE import java.nio.charset.Charset import kotlin.test.assertTrue import kotlin.test.fail @@ -39,8 +38,9 @@ import kotlin.test.fail */ object WireFrameCodecsTest : Spek({ val payloadAsString = "coffeebabe" + val maxPayloadSizeBytes = 1024 val encoder = WireFrameEncoder() - val decoder = WireFrameDecoder() + val decoder = WireFrameDecoder(maxPayloadSizeBytes) fun createSampleFrame() = WireFrameMessage(payloadAsString.toByteArray(Charset.defaultCharset())) @@ -223,7 +223,7 @@ object WireFrameCodecsTest : Spek({ it("should decode successfully when payload size is equal 1 MiB") { - val payload = ByteArray(MAX_PAYLOAD_SIZE) + val payload = ByteArray(maxPayloadSizeBytes) val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, @@ -237,7 +237,7 @@ object WireFrameCodecsTest : Spek({ it("should return error when payload exceeds 1 MiB") { - val payload = ByteArray(MAX_PAYLOAD_SIZE + 1) + val payload = ByteArray(maxPayloadSizeBytes + 1) val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, @@ -253,7 +253,7 @@ object WireFrameCodecsTest : Spek({ it("should validate only first message") { - val payload = ByteArray(MAX_PAYLOAD_SIZE) + val payload = ByteArray(maxPayloadSizeBytes) val input = WireFrameMessage( payload = ByteData(payload), versionMajor = 1, diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt index d6ff9efa..826982d6 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/ArgVesHvConfiguration.kt @@ -27,6 +27,7 @@ import arrow.instances.extensions import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ConfigurationProviderParams import org.onap.dcae.collectors.veshv.model.ServerConfiguration import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration @@ -40,6 +41,7 @@ import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.IDLE_T import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD @@ -62,6 +64,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration TRUST_STORE_FILE, TRUST_STORE_PASSWORD, IDLE_TIMEOUT_SEC, + MAXIMUM_PAYLOAD_SIZE_BYTES, DUMMY_MODE ) @@ -73,6 +76,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration ) val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val idleTimeoutSec = cmdLine.longValue(IDLE_TIMEOUT_SEC, DefaultValues.IDLE_TIMEOUT_SEC) + val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, DefaultValues.MAX_PAYLOAD_SIZE_BYTES) val dummyMode = cmdLine.hasOption(DUMMY_MODE) val security = createSecurityConfiguration(cmdLine).bind() val configurationProviderParams = createConfigurationProviderParams(cmdLine).bind() @@ -82,6 +86,7 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration configurationProviderParams = configurationProviderParams, securityConfiguration = security, idleTimeout = Duration.ofSeconds(idleTimeoutSec), + maximumPayloadSizeBytes = maxPayloadSizeBytes, dummyMode = dummyMode) }.fix() @@ -110,5 +115,6 @@ internal class ArgVesHvConfiguration : ArgBasedConfiguration<ServerConfiguration const val CONSUL_FIRST_REQUEST_DELAY = 10L const val CONSUL_REQUEST_INTERVAL = 5L const val IDLE_TIMEOUT_SEC = 60L + const val MAX_PAYLOAD_SIZE_BYTES = WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES } } diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt index a84a39a5..78d42830 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/main.kt @@ -48,6 +48,7 @@ fun main(args: Array<String>) = private fun startAndAwaitServers(config: ServerConfiguration) = IO.monad().binding { + logger.info("Using configuration: $config") HealthCheckServer.start(config).bind() VesServer.start(config).bind() .await().bind() diff --git a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index fbf8936f..992a4dc8 100644 --- a/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -40,7 +40,8 @@ object VesServer : ServerStarter() { val collectorProvider = CollectorFactory( AdapterFactory.consulConfigurationProvider(config.configurationProviderParams), sink, - MicrometerMetrics() + MicrometerMetrics(), + config.maximumPayloadSizeBytes ).createVesHvCollectorProvider() return ServerFactory.createNettyTcpServer(config, collectorProvider) diff --git a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt index 035d94ee..3bf615af 100644 --- a/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt +++ b/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/messages.kt @@ -23,7 +23,6 @@ import com.google.protobuf.ByteString import io.netty.buffer.ByteBuf import io.netty.buffer.ByteBufAllocator import io.netty.buffer.PooledByteBufAllocator -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.MAX_PAYLOAD_SIZE import org.onap.dcae.collectors.veshv.domain.WireFrameMessage.Companion.RESERVED_BYTE_COUNT import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP @@ -70,13 +69,13 @@ fun invalidWireFrame(): ByteBuf = allocator.buffer().run { writeByte(0x01) // version minor } -fun vesMessageWithTooBigPayload(domain: VesEventDomain = PERF3GPP): ByteBuf = +fun vesMessageWithPayloadOfSize(payloadSizeBytes: Int, domain: VesEventDomain = PERF3GPP): ByteBuf = allocator.buffer().run { writeValidWireFrameHeaders() val gpb = vesEvent( domain = domain, - eventFields = ByteString.copyFrom(ByteArray(MAX_PAYLOAD_SIZE)) + eventFields = ByteString.copyFrom(ByteArray(payloadSizeBytes)) ).toByteString().asReadOnlyByteBuffer() writeInt(gpb.limit()) // ves event size in bytes diff --git a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt index 288a578d..9439bff5 100644 --- a/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt +++ b/hv-collector-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/utils/commandline/CommandLineOption.kt @@ -119,6 +119,12 @@ enum class CommandLineOption(val option: Option, val required: Boolean = false) |connection might be closed.""".trimMargin()) .build() ), + MAXIMUM_PAYLOAD_SIZE_BYTES(Option.builder("m") + .longOpt("max-payload-size") + .hasArg() + .desc("Maximum supported payload size in bytes") + .build() + ), DUMMY_MODE(Option.builder("u") .longOpt("dummy") .desc("If present will start in dummy mode (dummy external services)") diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt index ace7f1cb..076c06be 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/api/MessageGenerator.kt @@ -20,8 +20,6 @@ package org.onap.dcae.collectors.veshv.ves.message.generator.api import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator import reactor.core.publisher.Flux /** @@ -32,10 +30,6 @@ interface MessageGenerator { fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> companion object { - val INSTANCE: MessageGenerator by lazy { - MessageGeneratorImpl(PayloadGenerator()) - } - const val FIXED_PAYLOAD_SIZE = 100 } } diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt new file mode 100644 index 00000000..e2269c20 --- /dev/null +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/factory/MessageGeneratorFactory.kt @@ -0,0 +1,33 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.ves.message.generator.factory + +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageGenerator +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageGeneratorImpl +import org.onap.dcae.collectors.veshv.ves.message.generator.impl.PayloadGenerator + +/** + * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> + * @since October 2018 + */ +object MessageGeneratorFactory { + fun create(maxPayloadSizeBytes: Int): MessageGenerator = + MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes) +} diff --git a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt index 2bb9f781..cc1d16fe 100644 --- a/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt +++ b/hv-collector-ves-message-generator/src/main/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImpl.kt @@ -42,7 +42,10 @@ import java.nio.charset.Charset * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since June 2018 */ -class MessageGeneratorImpl internal constructor(private val payloadGenerator: PayloadGenerator) : MessageGenerator { +class MessageGeneratorImpl internal constructor( + private val payloadGenerator: PayloadGenerator, + private val maxPayloadSizeBytes: Int +) : MessageGenerator { override fun createMessageFlux(messageParameters: List<MessageParameters>): Flux<WireFrameMessage> = Flux .fromIterable(messageParameters) @@ -89,7 +92,7 @@ class MessageGeneratorImpl internal constructor(private val payloadGenerator: Pa .build() private fun oversizedPayload() = - payloadGenerator.generateRawPayload(WireFrameMessage.MAX_PAYLOAD_SIZE + 1) + payloadGenerator.generateRawPayload(maxPayloadSizeBytes + 1) private fun fixedPayload() = payloadGenerator.generateRawPayload(MessageGenerator.FIXED_PAYLOAD_SIZE) diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt index ec09ac9c..ee76b789 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageGeneratorImplTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageGeneratorImplTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import com.google.protobuf.ByteString import com.google.protobuf.InvalidProtocolBufferException @@ -47,7 +47,8 @@ import reactor.test.test */ object MessageGeneratorImplTest : Spek({ describe("message factory") { - val generator = MessageGenerator.INSTANCE + val maxPayloadSizeBytes = 1024 + val generator = MessageGeneratorImpl(PayloadGenerator(), maxPayloadSizeBytes) given("single message parameters") { on("messages amount not specified in parameters") { it("should create infinite flux") { @@ -87,7 +88,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName) } .verifyComplete() @@ -105,7 +106,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName) } .verifyComplete() @@ -122,7 +123,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThatExceptionOfType(InvalidProtocolBufferException::class.java) .isThrownBy { extractCommonEventHeader(it.payload) } } @@ -140,7 +141,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isFalse() - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName) assertThat(it.versionMajor).isNotEqualTo(WireFrameMessage.SUPPORTED_VERSION_MINOR) } @@ -158,7 +159,7 @@ object MessageGeneratorImplTest : Spek({ .test() .assertNext { assertThat(it.isValid()).isTrue() - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractEventFields(it.payload).size()).isEqualTo(MessageGenerator.FIXED_PAYLOAD_SIZE) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName) } @@ -177,17 +178,17 @@ object MessageGeneratorImplTest : Spek({ generator.createMessageFlux(messageParameters) .test() .assertNext { - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(PERF3GPP.domainName) } .expectNextCount(singleFluxSize - 1) .assertNext { - assertThat(it.payloadSize).isGreaterThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isGreaterThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(FAULT.domainName) } .expectNextCount(singleFluxSize - 1) .assertNext { - assertThat(it.payloadSize).isLessThan(WireFrameMessage.MAX_PAYLOAD_SIZE) + assertThat(it.payloadSize).isLessThan(maxPayloadSizeBytes) assertThat(extractCommonEventHeader(it.payload).domain).isEqualTo(HEARTBEAT.domainName) } .expectNextCount(singleFluxSize - 1) diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt index 3b1a48b3..134ebb2d 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/MessageParametersParserTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/MessageParametersParserTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import org.assertj.core.api.Assertions.assertThat import org.assertj.core.api.Assertions.fail @@ -27,7 +27,6 @@ import org.jetbrains.spek.api.dsl.given import org.jetbrains.spek.api.dsl.it import org.jetbrains.spek.api.dsl.on import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageType -import org.onap.dcae.collectors.veshv.ves.message.generator.impl.MessageParametersParserImpl private const val EXPECTED_MESSAGES_AMOUNT = 25000L diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt index 2b41e290..5249a8d2 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/PayloadGeneratorTest.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/PayloadGeneratorTest.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.given diff --git a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt index dd8bc89f..5d112bae 100644 --- a/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/impl/parameters.kt +++ b/hv-collector-ves-message-generator/src/test/kotlin/org/onap/dcae/collectors/veshv/ves/message/generator/impl/parameters.kt @@ -17,7 +17,7 @@ * limitations under the License. * ============LICENSE_END========================================================= */ -package org.onap.dcae.collectors.veshv.ves.message.generator.impl.impl +package org.onap.dcae.collectors.veshv.ves.message.generator.impl import javax.json.Json diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 3fde2c7e..ec5ef81a 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -39,8 +39,8 @@ import javax.json.Json */ class XnfSimulator( private val vesClient: VesHvClient, - private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE, - private val messageGenerator: MessageGenerator = MessageGenerator.INSTANCE) { + private val messageGenerator: MessageGenerator, + private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt index 54ead6f7..06f1cffe 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/XnfApiServer.kt @@ -22,12 +22,8 @@ package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters import arrow.core.Either import arrow.effects.IO import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.Status import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.utils.http.Content -import org.onap.dcae.collectors.veshv.utils.http.ContentType import org.onap.dcae.collectors.veshv.utils.http.HttpConstants -import org.onap.dcae.collectors.veshv.utils.http.HttpStatus import org.onap.dcae.collectors.veshv.utils.http.Response import org.onap.dcae.collectors.veshv.utils.http.Responses import org.onap.dcae.collectors.veshv.utils.http.sendAndHandleErrors @@ -40,7 +36,6 @@ import ratpack.http.TypedData import ratpack.server.RatpackServer import ratpack.server.ServerConfig import java.util.* -import javax.json.Json /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index 3d8dc948..7966a4e9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -25,11 +25,13 @@ import arrow.core.monad import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser +import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD @@ -48,6 +50,7 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon VES_HV_PORT, VES_HV_HOST, LISTEN_PORT, + MAXIMUM_PAYLOAD_SIZE_BYTES, SSL_DISABLE, KEY_STORE_FILE, KEY_STORE_PASSWORD, @@ -59,12 +62,15 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon val listenPort = cmdLine.intValue(LISTEN_PORT).bind() val vesHost = cmdLine.stringValue(VES_HV_HOST).bind() val vesPort = cmdLine.intValue(VES_HV_PORT).bind() + val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) SimulatorConfiguration( listenPort, vesHost, vesPort, + maxPayloadSizeBytes, createSecurityConfiguration(cmdLine).bind()) }.fix() + } diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 9b6ef209..3395d282 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -29,4 +29,5 @@ data class SimulatorConfiguration( val listenPort: Int, val vesHost: String, val vesPort: Int, + val maxPayloadSizeBytes: Int, val security: SecurityConfiguration) diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index c9e900ac..4512dfbf 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -29,6 +29,7 @@ import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.arrow.unit import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory private const val PACKAGE_NAME = "org.onap.dcae.collectors.veshv.simulators.xnf" private val logger = Logger(PACKAGE_NAME) @@ -41,7 +42,11 @@ const val PROGRAM_NAME = "java $PACKAGE_NAME.MainKt" fun main(args: Array<String>) = ArgXnfSimulatorConfiguration().parse(args) .mapLeft(handleWrongArgumentErrorCurried(PROGRAM_NAME)) .map { config -> - XnfApiServer(XnfSimulator(VesHvClient(config)), OngoingSimulations()) + logger.info("Using configuration: $config") + val xnfSimulator = XnfSimulator( + VesHvClient(config), + MessageGeneratorFactory.create(config.maxPayloadSizeBytes)) + XnfApiServer(xnfSimulator, OngoingSimulations()) .start(config.listenPort) .unit() } diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 97535887..2a78ed5e 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -54,7 +54,7 @@ internal class XnfSimulatorTest : Spek({ vesClient = mock() messageParametersParser = mock() messageGenerator = mock() - cut = XnfSimulator(vesClient, messageParametersParser, messageGenerator) + cut = XnfSimulator(vesClient, messageGenerator, messageParametersParser) } describe("startSimulation") { |