summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-xnf-simulator')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt48
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt12
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt26
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt28
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt2
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt11
-rw-r--r--sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt7
9 files changed, 78 insertions, 60 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 53a8826c..93c43173 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,6 +26,7 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.asIo
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
@@ -34,9 +35,14 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass
+import reactor.core.Disposable
+import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
+import java.nio.ByteBuffer
import javax.json.Json
import javax.json.JsonArray
@@ -52,6 +58,8 @@ class XnfSimulator(
private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+ private val defaultHvVesClient by lazy { clientFactory.create() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
val json = parseJsonArray(messageParameters).bind()
@@ -64,23 +72,43 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
- .toFlux()
- .flatMap(::simulate)
- .then(Mono.just(Unit))
- .asIo()
- private fun simulate(parameters: MessageParameters): Mono<Unit> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> =
+ parameters
+ .map(::asClientToMessages)
+ .groupMessagesByClients()
+ .flattenValuesToFlux()
+ .toList()
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun <M> List<Pair<HvVesClient, M>>.groupMessagesByClients() =
+ groupBy({ it.first }, { it.second })
+
+ private fun <K> Map<K, List<Flux<ByteBuffer>>>.flattenValuesToFlux(): Map<K, Flux<ByteBuffer>> =
+ mapValues { Flux.concat(it.value) }
+
+ private fun asClientToMessages(parameters: MessageParameters) =
when (parameters) {
is VesEventParameters -> {
- val messages = vesEventGenerator.createMessageFlux(parameters)
- val client = clientFactory.create()
- client.sendVesEvents(messages)
+ val messages = vesEventGenerator
+ .createMessageFlux(parameters)
+ .map(VesEventOuterClass.VesEvent::toByteBuffer)
+ Pair(defaultHvVesClient, messages)
}
is WireFrameParameters -> {
val messages = wireFrameGenerator.createMessageFlux(parameters)
val client = clientFactory.create(parameters.wireFrameVersion)
- client.sendRawPayload(messages)
+ Pair(client, messages)
}
}
+
+ private fun simulate(pair: Pair<HvVesClient, Flux<ByteBuffer>>): Disposable =
+ pair.first
+ .sendRawPayload(pair.second, PayloadType.PROTOBUF)
+ .subscribe()
}
+
+internal fun VesEventOuterClass.VesEvent.toByteBuffer() = toByteString().asReadOnlyByteBuffer()
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
index afc157c4..19579431 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -23,10 +23,10 @@ import org.onap.dcae.collectors.veshv.utils.arrow.then
import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
-import org.onap.ves.VesEventOuterClass.VesEvent
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -34,15 +34,11 @@ import java.nio.ByteBuffer
*/
class HvVesClient(private val producer: HvVesProducer) {
- fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
- producer.send(messages)
- .then { logger.info { "Ves Events have been sent" } }
+ fun sendRawPayload(messages: Flux<ByteBuffer>, payloadType: PayloadType = PayloadType.UNDEFINED): Mono<Unit> =
+ producer.sendRaw(messages, payloadType)
+ .then { logger.info { "Producer sent raw messages with payload type ${payloadType}" } }
- fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
- producer.sendRaw(messages, PayloadType.UNDEFINED)
- .then { logger.info { "Raw messages have been sent" } }
-
companion object {
private val logger = Logger(HvVesClient::class)
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
index b5751a3f..0891e499 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ArgXnfSimulatorConfiguration.kt
@@ -26,19 +26,19 @@ import arrow.typeclasses.binding
import org.apache.commons.cli.CommandLine
import org.apache.commons.cli.DefaultParser
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfiguration
+import org.onap.dcae.collectors.veshv.ssl.boundary.createSecurityConfigurationProvider
import org.onap.dcae.collectors.veshv.utils.arrow.doOnFailure
import org.onap.dcae.collectors.veshv.utils.commandline.ArgBasedConfiguration
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.HEALTH_CHECK_API_PORT
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
-import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.KEY_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.LISTEN_PORT
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.MAXIMUM_PAYLOAD_SIZE_BYTES
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.SSL_DISABLE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_FILE
import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.TRUST_STORE_PASSWORD
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_HOST
+import org.onap.dcae.collectors.veshv.utils.commandline.CommandLineOption.VES_HV_PORT
import org.onap.dcae.collectors.veshv.utils.commandline.intValue
import org.onap.dcae.collectors.veshv.utils.commandline.stringValue
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -71,14 +71,14 @@ internal class ArgXnfSimulatorConfiguration : ArgBasedConfiguration<SimulatorCon
val maxPayloadSizeBytes = cmdLine.intValue(MAXIMUM_PAYLOAD_SIZE_BYTES,
WireFrameMessage.DEFAULT_MAX_PAYLOAD_SIZE_BYTES)
- val security = createSecurityConfiguration(cmdLine)
- .doOnFailure { ex ->
- logger.withError {
- log("Could not read security keys", ex)
+ val security = createSecurityConfigurationProvider(cmdLine)
+ .doOnFailure { ex ->
+ logger.withError {
+ log("Could not read security keys", ex)
+ }
}
- }
- .toOption()
- .bind()
+ .toOption()
+ .bind()
SimulatorConfiguration(
InetSocketAddress(listenPort),
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
index 1db66f11..e9fecd66 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
@@ -28,4 +28,4 @@ import java.net.InetSocketAddress
* @since February 2019
*/
data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
index 5a0e73c7..0021ed82 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/SimulatorConfiguration.kt
@@ -31,4 +31,4 @@ data class SimulatorConfiguration(
val healthCheckApiListenAddress: InetSocketAddress,
val hvVesAddress: InetSocketAddress,
val maxPayloadSizeBytes: Int,
- val security: SecurityConfiguration)
+ val securityProvider: () -> SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
index a91fccd4..72a1165e 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
@@ -29,23 +29,25 @@ import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
* @since February 2019
*/
-class ClientFactory(configuration: ClientConfiguration) {
+class ClientFactory(private val configuration: ClientConfiguration) {
- private val partialConfig = ImmutableProducerOptions
+ fun create() = hvVesClient(partialConfiguration().build())
+
+ fun create(wireFrameVersion: WireFrameVersion) = hvVesClient(
+ partialConfiguration()
+ .wireFrameVersion(wireFrameVersion)
+ .build())
+
+ private fun partialConfiguration() = ImmutableProducerOptions
.builder()
.collectorAddresses(configuration.collectorAddresses)
- .let { producerOptions ->
- configuration.security.keys.fold(
- { producerOptions },
- { producerOptions.securityKeys(it) })
+ .let { options ->
+ configuration.securityProvider().keys.fold(
+ { options },
+ { options.securityKeys(it) })
}
- fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
- buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
-
-
- fun create(): HvVesClient = buildClient(partialConfig)
+ private fun hvVesClient(producerOptions: ImmutableProducerOptions) =
+ HvVesClient(HvVesProducerFactory.create(producerOptions))
- private fun buildClient(config: ImmutableProducerOptions.Builder) =
- HvVesClient(HvVesProducerFactory.create(config.build()))
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index 366c7e66..baa231c5 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -67,7 +67,7 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
- val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.securityProvider)
val xnfSimulator = XnfSimulator(
ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
index daf30617..14061532 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/HvVesClientTest.kt
@@ -44,17 +44,6 @@ internal class HvVesClientTest : Spek({
val hvVesProducer: HvVesProducer = mock()
val cut = HvVesClient(hvVesProducer)
- describe("handling ves events stream") {
-
- val vesEvents = Flux.empty<VesEventOuterClass.VesEvent>()
- whenever(hvVesProducer.send(any())).thenReturn(Mono.empty())
- cut.sendVesEvents(vesEvents)
-
- it("should perform sending operation") {
- verify(hvVesProducer).send(vesEvents)
- }
- }
-
describe("handling raw message stream") {
val rawMessages = Flux.empty<ByteBuffer>()
diff --git a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 123f12ae..29281cdc 100644
--- a/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/sources/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,6 +23,7 @@ import arrow.core.Left
import arrow.core.None
import arrow.core.Right
import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
@@ -39,6 +40,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParamete
import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventType
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
import org.onap.dcae.collectors.veshv.ves.message.generator.generators.VesEventGenerator
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
@@ -120,13 +122,14 @@ internal class XnfSimulatorTest : Spek({
whenever(generatorFactory.createVesEventGenerator()).thenReturn(vesEventGenerator)
whenever(vesEventGenerator.createMessageFlux(vesEventParams)).thenReturn(generatedMessages)
whenever(clientFactory.create()).thenReturn(vesClient)
- whenever(vesClient.sendVesEvents(generatedMessages)).thenReturn(Mono.just(Unit))
+
+ whenever(vesClient.sendRawPayload(any(), eq(PayloadType.PROTOBUF))).thenReturn(Mono.just(Unit))
// when
cut.startSimulation(json).map { it.unsafeRunSync() }
// then
- verify(vesClient).sendVesEvents(generatedMessages)
+ verify(vesClient).sendRawPayload(any(), eq(PayloadType.PROTOBUF))
}
}
}) \ No newline at end of file