From 756e7210cf13c6ef9bae8f785d3f46112c136f7d Mon Sep 17 00:00:00 2001 From: Jakub Dudycz Date: Fri, 1 Mar 2019 17:39:09 +0100 Subject: Fix ssl related bug in xnf simulator Fix bug when xnf simnulator was using same SecurityKeys object instance for every new VesClient, which resulted in fault while trying to connect to collector. With new implementation simulator reuses same HvVesProdcuer from SDK for every VesEvent request received and creates new Producer for every WireFrameEvent request. This allows to continue testing cases in which there is need to assert if connection was dropped from malicious client. Change-Id: I5f51a58de85cccf7de6ab2392f86259502be31dd Issue-ID: DCAEGEN2-1291 Signed-off-by: Jakub Dudycz Signed-off-by: Filip Krzywka --- .../veshv/simulators/xnf/impl/XnfSimulator.kt | 48 +++++++++++++++++----- .../simulators/xnf/impl/adapters/HvVesClient.kt | 12 ++---- .../impl/config/ArgXnfSimulatorConfiguration.kt | 26 ++++++------ .../xnf/impl/config/ClientConfiguration.kt | 2 +- .../xnf/impl/config/SimulatorConfiguration.kt | 2 +- .../simulators/xnf/impl/factory/ClientFactory.kt | 28 +++++++------ .../dcae/collectors/veshv/simulators/xnf/main.kt | 2 +- 7 files changed, 73 insertions(+), 47 deletions(-) (limited to 'sources/hv-collector-xnf-simulator/src/main') diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 53a8826c..93c43173 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,6 +26,7 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters @@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass +import reactor.core.Disposable +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream +import java.nio.ByteBuffer import javax.json.Json import javax.json.JsonArray @@ -52,6 +58,8 @@ class XnfSimulator( private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + private val defaultHvVesClient by lazy { clientFactory.create() } + fun startSimulation(messageParameters: InputStream): Either> = Either.monad().binding { val json = parseJsonArray(messageParameters).bind() @@ -64,23 +72,43 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List): IO = parameters - .toFlux() - .flatMap(::simulate) - .then(Mono.just(Unit)) - .asIo() - private fun simulate(parameters: MessageParameters): Mono = + private fun simulationFrom(parameters: List): IO = + parameters + .map(::asClientToMessages) + .groupMessagesByClients() + .flattenValuesToFlux() + .toList() + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun List>.groupMessagesByClients() = + groupBy({ it.first }, { it.second }) + + private fun Map>>.flattenValuesToFlux(): Map> = + mapValues { Flux.concat(it.value) } + + private fun asClientToMessages(parameters: MessageParameters) = when (parameters) { is VesEventParameters -> { - val messages = vesEventGenerator.createMessageFlux(parameters) - val client = clientFactory.create() - client.sendVesEvents(messages) + val messages = vesEventGenerator + .createMessageFlux(parameters) + .map(VesEventOuterClass.VesEvent::toByteBuffer) + Pair(defaultHvVesClient, messages) } is WireFrameParameters -> { val messages = wireFrameGenerator.createMessageFlux(parameters) val client = clientFactory.create(parameters.wireFrameVersion) - client.sendRawPayload(messages) + Pair(client, messages) } } + + private fun simulate(pair: Pair>): Disposable = + pair.first + .sendRawPayload(pair.second, PayloadType.PROTOBUF) + .subscribe() } + +internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index afc157c4..19579431 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType -import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz @@ -34,15 +34,11 @@ import java.nio.ByteBuffer */ class HvVesClient(private val producer: HvVesProducer) { - fun sendVesEvents(messages: Flux): Mono = - producer.send(messages) - .then { logger.info { "Ves Events have been sent" } } + fun sendRawPayload(messages: Flux, payloadType: PayloadType = PayloadType.UNDEFINED): Mono = + producer.sendRaw(messages, payloadType) + .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } } - fun sendRawPayload(messages: Flux): Mono = - producer.sendRaw(messages, PayloadType.UNDEFINED) - .then { logger.info { "Raw messages have been sent" } } - companion object { private val logger = Logger(HvVesClient::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index b5751a3f..0891e499 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -26,19 +26,19 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration - logger.withError { - log("Could not read security keys", ex) + val security = createSecurityConfigurationProvider(cmdLine) + .doOnFailure { ex -> + logger.withError { + log("Could not read security keys", ex) + } } - } - .toOption() - .bind() + .toOption() + .bind() SimulatorConfiguration( InetSocketAddress(listenPort), diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt index 1db66f11..e9fecd66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -28,4 +28,4 @@ import java.net.InetSocketAddress * @since February 2019 */ data class ClientConfiguration(val collectorAddresses: Set, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 5a0e73c7..0021ed82 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -31,4 +31,4 @@ data class SimulatorConfiguration( val healthCheckApiListenAddress: InetSocketAddress, val hvVesAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt index a91fccd4..72a1165e 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options * @author Jakub Dudycz * @since February 2019 */ -class ClientFactory(configuration: ClientConfiguration) { +class ClientFactory(private val configuration: ClientConfiguration) { - private val partialConfig = ImmutableProducerOptions + fun create() = hvVesClient(partialConfiguration().build()) + + fun create(wireFrameVersion: WireFrameVersion) = hvVesClient( + partialConfiguration() + .wireFrameVersion(wireFrameVersion) + .build()) + + private fun partialConfiguration() = ImmutableProducerOptions .builder() .collectorAddresses(configuration.collectorAddresses) - .let { producerOptions -> - configuration.security.keys.fold( - { producerOptions }, - { producerOptions.securityKeys(it) }) + .let { options -> + configuration.securityProvider().keys.fold( + { options }, + { options.securityKeys(it) }) } - fun create(wireFrameVersion: WireFrameVersion): HvVesClient = - buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) - - - fun create(): HvVesClient = buildClient(partialConfig) + private fun hvVesClient(producerOptions: ImmutableProducerOptions) = + HvVesClient(HvVesProducerFactory.create(producerOptions)) - private fun buildClient(config: ImmutableProducerOptions.Builder) = - HvVesClient(HvVesProducerFactory.create(config.build())) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 366c7e66..baa231c5 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) val xnfSimulator = XnfSimulator( ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) -- cgit 1.2.3-korg