aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-03-01 17:39:09 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-05 09:08:12 +0100
commit756e7210cf13c6ef9bae8f785d3f46112c136f7d (patch)
treecacd684842f901379664fe22eea957386e4dbe96 /sources/hv-collector-xnf-simulator
parentc50b6606f4af4452d1b107929956775e86e366c1 (diff)
Fix ssl related bug in xnf simulator
Fix bug when xnf simnulator was using same SecurityKeys object instance for every new VesClient, which resulted in fault while trying to connect to collector. With new implementation simulator reuses same HvVesProdcuer from SDK for every VesEvent request received and creates new Producer for every WireFrameEvent request. This allows to continue testing cases in which there is need to assert if connection was dropped from malicious client. Change-Id: I5f51a58de85cccf7de6ab2392f86259502be31dd Issue-ID: DCAEGEN2-1291 Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt48
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt26
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt28
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt11
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt7
9 files changed, 78 insertions, 60 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 53a8826c..93c43173 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,6 +26,7 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
+import java.nio.ByteBuffer
import javax.json.Json
import javax.json.JsonArray
@@ -52,6 +58,8 @@ class XnfSimulator(
private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+ private val defaultHvVesClient by lazy { clientFactory.create() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
@@ -64,23 +72,43 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
- .toFlux()
- .flatMap(::simulate)
- .then(Mono.just(Unit))
- .asIo()
- private fun simulate(parameters: MessageParameters): Mono<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ parameters
+ .map(::asClientToMessages)
+ .groupMessagesByClients()
+ .flattenValuesToFlux()
+ .toList()
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
+ groupBy({ it.first }, { it.second })
+
+ private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> =
+ mapValues { Flux.concat(it.value) }
+
+ private fun asClientToMessages(parameters: MessageParameters) =
when (parameters) {
is VesEventParameters -> {
- val messages = vesEventGenerator.createMessageFlux(parameters)
- val client = clientFactory.create()
- client.sendVesEvents(messages)
+ val messages = vesEventGenerator
+ .createMessageFlux(parameters)
+ .map(VesEventOuterClass.VesEvent::toByteBuffer)
+ Pair(defaultHvVesClient, messages)
}
is WireFrameParameters -> {
val messages = wireFrameGenerator.createMessageFlux(parameters)
val client = clientFactory.create(parameters.wireFrameVersion)
- client.sendRawPayload(messages)
+ Pair(client, messages)
}
}
+
+ private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable =
+ pair.first
+ .sendRawPayload(pair.second, PayloadType.PROTOBUF)
+ .subscribe()
}
+
+internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer()
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
index afc157c4..19579431 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
-import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -34,15 +34,11 @@ import java.nio.ByteBuffer
*/
class HvVesClient(private val producer: HvVesProducer) {
- fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
- producer.send(messages)
- .then { logger.info { "Ves Events have been sent" } }
+ fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> =
+ producer.sendRaw(messages, payloadType)
+ .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } }
- fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
- producer.sendRaw(messages, PayloadType.UNDEFINED)
- .then { logger.info { "Raw messages have been sent" } }
-
companion object {
private val logger = Logger(HvVesClient::class)
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index b5751a3f..0891e499 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -26,19 +26,19 @@ import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val security = createSecurityConfiguration(cmdLine)
- .doOnFailure { ex ->
- logger.withError {
- log("Could not read security keys", ex)
+ val security = createSecurityConfigurationProvider(cmdLine)
+ .doOnFailure { ex ->
+ logger.withError {
+ log("Could not read security keys", ex)
+ }
}
- }
- .toOption()
- .bind()
+ .toOption()
+ .bind()
SimulatorConfiguration(
InetSocketAddress(listenPort),
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
index 1db66f11..e9fecd66 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
@@ -28,4 +28,4 @@ import java.net.InetSocketAddress
* @since February 2019
*/
data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 5a0e73c7..0021ed82 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -31,4 +31,4 @@ data class SimulatorConfiguration(
val healthCheckApiListenAddress: InetSocketAddress,
val hvVesAddress: InetSocketAddress,
val maxPayloadSizeBytes: Int,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
index a91fccd4..72a1165e 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
@@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since February 2019
*/
-class ClientFactory(configuration: ClientConfiguration) {
+class ClientFactory(private val configuration: ClientConfiguration) {
- private val partialConfig = ImmutableProducerOptions
+ fun create() = hvVesClient(partialConfiguration().build())
+
+ fun create(wireFrameVersion: WireFrameVersion) = hvVesClient(
+ partialConfiguration()
+ .wireFrameVersion(wireFrameVersion)
+ .build())
+
+ private fun partialConfiguration() = ImmutableProducerOptions
.builder()
.collectorAddresses(configuration.collectorAddresses)
- .let { producerOptions ->
- configuration.security.keys.fold(
- { producerOptions },
- { producerOptions.securityKeys(it) })
+ .let { options ->
+ configuration.securityProvider().keys.fold(
+ { options },
+ { options.securityKeys(it) })
}
- fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
- buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
-
-
- fun create(): HvVesClient = buildClient(partialConfig)
+ private fun hvVesClient(producerOptions: ImmutableProducerOptions) =
+ HvVesClient(HvVesProducerFactory.create(producerOptions))
- private fun buildClient(config: ImmutableProducerOptions.Builder) =
- HvVesClient(HvVesProducerFactory.create(config.build()))
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 366c7e66..baa231c5 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
- val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
val xnfSimulator = XnfSimulator(
ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
index daf30617..14061532 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
@@ -44,17 +44,6 @@ internal class HvVesClientTest : Spek({
val hvVesProducer: HvVesProducer = mock()
val cut = HvVesClient(hvVesProducer)
- describe("handling ves events stream") {
-
- val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
- whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
- cut.sendVesEvents(vesEvents)
-
- it("should perform sending operation") {
- verify(hvVesProducer).send(vesEvents)
- }
- }
-
describe("handling raw message stream") {
val rawMessages = Flux.empty<ByteBuffer>()
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 123f12ae..29281cdc 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,6 +23,7 @@ import arrow.core.Left
import arrow.core.None
import arrow.core.Right
import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
@@ -39,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParamete
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
@@ -120,13 +122,14 @@ internal class XnfSimulatorTest : Spek({
whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
whenever(clientFactory.create()).thenReturn(vesClient)
- whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
// when
cut.startSimulation(json).map { it.unsafeRunSync() }
// then
- verify(vesClient).sendVesEvents(generatedMessages)
+ verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))
}
}
}) \ No newline at end of file