diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-03-01 17:39:09 +0100 |
---|---|---|
committer | Filip Krzywka <filip.krzywka@nokia.com> | 2019-03-05 09:08:12 +0100 |
commit | 756e7210cf13c6ef9bae8f785d3f46112c136f7d (patch) | |
tree | cacd684842f901379664fe22eea957386e4dbe96 /sources | |
parent | c50b6606f4af4452d1b107929956775e86e366c1 (diff) |
Fix ssl related bug in xnf simulator
Fix bug when xnf simnulator was using same SecurityKeys object instance for every new VesClient,
which resulted in fault while trying to connect to collector.
With new implementation simulator reuses same HvVesProdcuer from SDK
for every VesEvent request received and creates new Producer for every
WireFrameEvent request. This allows to continue testing cases in which
there is need to assert if connection was dropped from malicious client.
Change-Id: I5f51a58de85cccf7de6ab2392f86259502be31dd
Issue-ID: DCAEGEN2-1291
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources')
10 files changed, 89 insertions, 65 deletions
diff --git a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt index fb142639..478713e2 100644 --- a/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt +++ b/sources/hv-collector-ssl/src/main/kotlin/org/onap/dcae/collectors/veshv/ssl/boundary/utils.kt @@ -42,14 +42,19 @@ const val KEY_STORE_FILE = "/etc/ves-hv/server.p12" const val TRUST_STORE_FILE = "/etc/ves-hv/trust.p12" fun createSecurityConfiguration(cmdLine: CommandLine): Try<SecurityConfiguration> = - if (cmdLine.hasOption(CommandLineOption.SSL_DISABLE)) - Try { disabledSecurityConfiguration() } + createSecurityConfigurationProvider(cmdLine).map { it() } + +fun createSecurityConfigurationProvider(cmdLine: CommandLine): Try<() -> SecurityConfiguration> = + if (shouldDisableSsl(cmdLine)) + Try { { disabledSecurityConfiguration() } } else - enabledSecurityConfiguration(cmdLine) + Try { { enabledSecurityConfiguration(cmdLine) } } + +private fun shouldDisableSsl(cmdLine: CommandLine) = cmdLine.hasOption(CommandLineOption.SSL_DISABLE) private fun disabledSecurityConfiguration() = SecurityConfiguration(keys = None) -private fun enabledSecurityConfiguration(cmdLine: CommandLine) = Try { +private fun enabledSecurityConfiguration(cmdLine: CommandLine): SecurityConfiguration { val ksFile = cmdLine.stringValue(CommandLineOption.KEY_STORE_FILE, KEY_STORE_FILE) val ksPass = cmdLine.stringValue(CommandLineOption.KEY_STORE_PASSWORD).getOrElse { "" } val tsFile = cmdLine.stringValue(CommandLineOption.TRUST_STORE_FILE, TRUST_STORE_FILE) @@ -62,7 +67,8 @@ private fun enabledSecurityConfiguration(cmdLine: CommandLine) = Try { .trustStorePassword(Passwords.fromString(tsPass)) .build() - SecurityConfiguration(keys = Some(keys)) + return SecurityConfiguration(keys = Some(keys)) } + private fun pathFromFile(file: String) = Paths.get(file) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 53a8826c..93c43173 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,6 +26,7 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.asIo import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters @@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass +import reactor.core.Disposable +import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream +import java.nio.ByteBuffer import javax.json.Json import javax.json.JsonArray @@ -52,6 +58,8 @@ class XnfSimulator( private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + private val defaultHvVesClient by lazy { clientFactory.create() } + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { val json = parseJsonArray(messageParameters).bind() @@ -64,23 +72,43 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters - .toFlux() - .flatMap(::simulate) - .then(Mono.just(Unit)) - .asIo() - private fun simulate(parameters: MessageParameters): Mono<Unit> = + private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = + parameters + .map(::asClientToMessages) + .groupMessagesByClients() + .flattenValuesToFlux() + .toList() + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() = + groupBy({ it.first }, { it.second }) + + private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> = + mapValues { Flux.concat(it.value) } + + private fun asClientToMessages(parameters: MessageParameters) = when (parameters) { is VesEventParameters -> { - val messages = vesEventGenerator.createMessageFlux(parameters) - val client = clientFactory.create() - client.sendVesEvents(messages) + val messages = vesEventGenerator + .createMessageFlux(parameters) + .map(VesEventOuterClass.VesEvent::toByteBuffer) + Pair(defaultHvVesClient, messages) } is WireFrameParameters -> { val messages = wireFrameGenerator.createMessageFlux(parameters) val client = clientFactory.create(parameters.wireFrameVersion) - client.sendRawPayload(messages) + Pair(client, messages) } } + + private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable = + pair.first + .sendRawPayload(pair.second, PayloadType.PROTOBUF) + .subscribe() } + +internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer() diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt index afc157c4..19579431 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then import org.onap.dcae.collectors.veshv.utils.logging.Logger import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType -import org.onap.ves.VesEventOuterClass.VesEvent import reactor.core.publisher.Flux import reactor.core.publisher.Mono import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicLong /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -34,15 +34,11 @@ import java.nio.ByteBuffer */ class HvVesClient(private val producer: HvVesProducer) { - fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> = - producer.send(messages) - .then { logger.info { "Ves Events have been sent" } } + fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> = + producer.sendRaw(messages, payloadType) + .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } } - fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> = - producer.sendRaw(messages, PayloadType.UNDEFINED) - .then { logger.info { "Raw messages have been sent" } } - companion object { private val logger = Logger(HvVesClient::class) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt index b5751a3f..0891e499 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt @@ -26,19 +26,19 @@ import arrow.typeclasses.binding import org.apache.commons.cli.CommandLine import org.apache.commons.cli.DefaultParser import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration +import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES -import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST +import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT import org.onap.dcae.collectors.veshv.utils.commandline.intValue import org.onap.dcae.collectors.veshv.utils.commandline.stringValue import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES, WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES) - val security = createSecurityConfiguration(cmdLine) - .doOnFailure { ex -> - logger.withError { - log("Could not read security keys", ex) + val security = createSecurityConfigurationProvider(cmdLine) + .doOnFailure { ex -> + logger.withError { + log("Could not read security keys", ex) + } } - } - .toOption() - .bind() + .toOption() + .bind() SimulatorConfiguration( InetSocketAddress(listenPort), diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt index 1db66f11..e9fecd66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -28,4 +28,4 @@ import java.net.InetSocketAddress * @since February 2019 */ data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt index 5a0e73c7..0021ed82 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt @@ -31,4 +31,4 @@ data class SimulatorConfiguration( val healthCheckApiListenAddress: InetSocketAddress, val hvVesAddress: InetSocketAddress, val maxPayloadSizeBytes: Int, - val security: SecurityConfiguration) + val securityProvider: () -> SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt index a91fccd4..72a1165e 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options * @author Jakub Dudycz <jakub.dudycz@nokia.com> * @since February 2019 */ -class ClientFactory(configuration: ClientConfiguration) { +class ClientFactory(private val configuration: ClientConfiguration) { - private val partialConfig = ImmutableProducerOptions + fun create() = hvVesClient(partialConfiguration().build()) + + fun create(wireFrameVersion: WireFrameVersion) = hvVesClient( + partialConfiguration() + .wireFrameVersion(wireFrameVersion) + .build()) + + private fun partialConfiguration() = ImmutableProducerOptions .builder() .collectorAddresses(configuration.collectorAddresses) - .let { producerOptions -> - configuration.security.keys.fold( - { producerOptions }, - { producerOptions.securityKeys(it) }) + .let { options -> + configuration.securityProvider().keys.fold( + { options }, + { options.securityKeys(it) }) } - fun create(wireFrameVersion: WireFrameVersion): HvVesClient = - buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) - - - fun create(): HvVesClient = buildClient(partialConfig) + private fun hvVesClient(producerOptions: ImmutableProducerOptions) = + HvVesClient(HvVesProducerFactory.create(producerOptions)) - private fun buildClient(config: ImmutableProducerOptions.Builder) = - HvVesClient(HvVesProducerFactory.create(config.build())) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index 366c7e66..baa231c5 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() - val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider) val xnfSimulator = XnfSimulator( ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt index daf30617..14061532 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt @@ -44,17 +44,6 @@ internal class HvVesClientTest : Spek({ val hvVesProducer: HvVesProducer = mock() val cut = HvVesClient(hvVesProducer) - describe("handling ves events stream") { - - val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>() - whenever(hvVesProducer.send(any())).thenReturn(Mono.empty()) - cut.sendVesEvents(vesEvents) - - it("should perform sending operation") { - verify(hvVesProducer).send(vesEvents) - } - } - describe("handling raw message stream") { val rawMessages = Flux.empty<ByteBuffer>() diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 123f12ae..29281cdc 100644 --- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,6 +23,7 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever @@ -39,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParamete import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType import org.onap.ves.VesEventOuterClass import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux @@ -120,13 +122,14 @@ internal class XnfSimulatorTest : Spek({ whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator) whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages) whenever(clientFactory.create()).thenReturn(vesClient) - whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit)) + + whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit)) // when cut.startSimulation(json).map { it.unsafeRunSync() } // then - verify(vesClient).sendVesEvents(generatedMessages) + verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF)) } } })
\ No newline at end of file |