summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-xnf-simulator/src/main/kotlin
diff options
context:
space:
mode:
authorJakub Dudycz <jakub.dudycz@nokia.com>2019-02-19 18:06:33 +0100
committerJakub Dudycz <jakub.dudycz@nokia.com>2019-02-21 12:54:54 +0100
commitbacba429e2dd6b3048da7e75800f5ad200952599 (patch)
treef62d89972415e8e8da51fd042b980048efea0ecd /sources/hv-collector-xnf-simulator/src/main/kotlin
parent82b27ff5bccc925fe03d05f259cf881fafc8a1ce (diff)
Use sdk/hvves-producer in hvves/xnf-simulator
Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com> Issue-ID: DCAEGEN2-1253
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main/kotlin')
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt54
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt49
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt106
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt31
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt51
-rw-r--r--sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt7
6 files changed, 167 insertions, 131 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
index 4dfdb845..812afe19 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt
@@ -26,12 +26,15 @@ import arrow.core.fix
import arrow.effects.IO
import arrow.instances.either.monad.monad
import arrow.typeclasses.binding
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
-import org.onap.dcae.collectors.veshv.ves.message.generator.api.*
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
+import org.onap.dcae.collectors.veshv.utils.arrow.asIo
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters
+import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters
import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory
-import org.onap.ves.VesEventOuterClass.VesEvent
-import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
import reactor.core.publisher.toFlux
import java.io.InputStream
import javax.json.Json
@@ -42,18 +45,18 @@ import javax.json.JsonArray
* @since August 2018
*/
class XnfSimulator(
- private val vesClient: VesHvClient,
+ private val clientFactory: ClientFactory,
private val generatorFactory: MessageGeneratorFactory,
private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) {
+ private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() }
+ private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() }
+
fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> =
Either.monad<ParsingError>().binding {
-
val json = parseJsonArray(messageParameters).bind()
- messageParametersParser.parse(json).bind()
- .toFlux()
- .flatMap(::generateMessages)
- .let { vesClient.sendIo(it) }
+ val parameters = messageParametersParser.parse(json).bind()
+ simulationFrom(parameters)
}.fix()
private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> =
@@ -61,18 +64,23 @@ class XnfSimulator(
.toEither()
.mapLeft { ParsingError("Failed to parse JSON", Some(it)) }
- private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> =
+ private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters
+ .toFlux()
+ .map(::simulate)
+ .then(Mono.just(Unit))
+ .asIo()
+
+ private fun simulate(parameters: MessageParameters): Mono<Unit> =
when (parameters) {
- is VesEventParameters -> generatorFactory
- .createVesEventGenerator()
- .createMessageFlux(parameters)
- .map(::encodeToWireFrame)
- is WireFrameParameters -> generatorFactory
- .createWireFrameGenerator()
- .createMessageFlux(parameters)
- else -> throw IllegalStateException("Invalid parameters type")
+ is VesEventParameters -> {
+ val messages = vesEventGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create()
+ client.sendVesEvents(messages)
+ }
+ is WireFrameParameters -> {
+ val messages = wireFrameGenerator.createMessageFlux(parameters)
+ val client = clientFactory.create(parameters.wireFrameVersion)
+ client.sendRawPayload(messages)
+ }
}
-
- private fun encodeToWireFrame(event: VesEvent): WireFrameMessage =
- WireFrameMessage(event.toByteArray())
}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
new file mode 100644
index 00000000..afc157c4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt
@@ -0,0 +1,49 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2018-2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
+
+import org.onap.dcae.collectors.veshv.utils.arrow.then
+import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType
+import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Flux
+import reactor.core.publisher.Mono
+import java.nio.ByteBuffer
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since June 2018
+ */
+class HvVesClient(private val producer: HvVesProducer) {
+
+ fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> =
+ producer.send(messages)
+ .then { logger.info { "Ves Events have been sent" } }
+
+
+ fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> =
+ producer.sendRaw(messages, PayloadType.UNDEFINED)
+ .then { logger.info { "Raw messages have been sent" } }
+
+ companion object {
+ private val logger = Logger(HvVesClient::class)
+ }
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
deleted file mode 100644
index eba8ed88..00000000
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * ============LICENSE_START=======================================================
- * dcaegen2-collectors-veshv
- * ================================================================================
- * Copyright (C) 2018 NOKIA
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters
-
-import arrow.core.Option
-import arrow.core.getOrElse
-import io.netty.handler.ssl.SslContext
-import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
-import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder
-import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
-import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory
-import org.onap.dcae.collectors.veshv.utils.arrow.asIo
-import org.onap.dcae.collectors.veshv.utils.logging.Logger
-import org.reactivestreams.Publisher
-import reactor.core.publisher.Flux
-import reactor.core.publisher.Mono
-import reactor.core.publisher.ReplayProcessor
-import reactor.netty.NettyOutbound
-import reactor.netty.tcp.TcpClient
-import reactor.util.concurrent.Queues.XS_BUFFER_SIZE
-
-/**
- * @author Jakub Dudycz <jakub.dudycz@nokia.com>
- * @since June 2018
- */
-class VesHvClient(private val configuration: SimulatorConfiguration) {
-
- private val client: TcpClient = TcpClient.create()
- .addressSupplier { configuration.hvVesAddress }
- .configureSsl()
-
- private fun TcpClient.configureSsl() =
- createSslContext(configuration.security)
- .map { sslContext -> this.secure(sslContext) }
- .getOrElse { this }
-
- fun sendIo(messages: Flux<WireFrameMessage>) =
- sendRx(messages).then(Mono.just(Unit)).asIo()
-
- private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
- val complete = ReplayProcessor.create<Void>(1)
- client
- .handle { _, output -> handler(complete, messages, output) }
- .connect()
- .doOnError {
- logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- .subscribe {
- logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" }
- }
- return complete.then()
- }
-
- private fun handler(complete: ReplayProcessor<Void>,
- messages: Flux<WireFrameMessage>,
- nettyOutbound: NettyOutbound): Publisher<Void> {
-
- val allocator = nettyOutbound.alloc()
- val encoder = WireFrameEncoder(allocator)
- val frames = messages
- .map(encoder::encode)
- .window(XS_BUFFER_SIZE)
-
- return nettyOutbound
- .logConnectionClosed()
- .options { it.flushOnBoundary() }
- .sendGroups(frames)
- .then {
- logger.info { "Messages have been sent" }
- complete.onComplete()
- }
- .then()
- }
-
- private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
- SslContextFactory().createClientContext(config)
-
- private fun NettyOutbound.logConnectionClosed() =
- withConnection { conn ->
- conn.onDispose {
- logger.info { "Connection to ${conn.address()} has been closed" }
- }
- }
-
- companion object {
- private val logger = Logger(VesHvClient::class)
- }
-}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
new file mode 100644
index 00000000..1db66f11
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt
@@ -0,0 +1,31 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config
+
+import io.vavr.collection.Set
+import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration
+import java.net.InetSocketAddress
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>,
+ val security: SecurityConfiguration)
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
new file mode 100644
index 00000000..a91fccd4
--- /dev/null
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt
@@ -0,0 +1,51 @@
+/*
+ * ============LICENSE_START=======================================================
+ * dcaegen2-collectors-veshv
+ * ================================================================================
+ * Copyright (C) 2019 NOKIA
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory
+
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions
+import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion
+
+/**
+ * @author Jakub Dudycz <jakub.dudycz@nokia.com>
+ * @since February 2019
+ */
+class ClientFactory(configuration: ClientConfiguration) {
+
+ private val partialConfig = ImmutableProducerOptions
+ .builder()
+ .collectorAddresses(configuration.collectorAddresses)
+ .let { producerOptions ->
+ configuration.security.keys.fold(
+ { producerOptions },
+ { producerOptions.securityKeys(it) })
+ }
+
+ fun create(wireFrameVersion: WireFrameVersion): HvVesClient =
+ buildClient(partialConfig.wireFrameVersion(wireFrameVersion))
+
+
+ fun create(): HvVesClient = buildClient(partialConfig)
+
+ private fun buildClient(config: ImmutableProducerOptions.Builder) =
+ HvVesClient(HvVesProducerFactory.create(config.build()))
+}
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
index ef627304..366c7e66 100644
--- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
+++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt
@@ -23,15 +23,17 @@ import arrow.effects.IO
import arrow.effects.fix
import arrow.effects.instances.io.monad.monad
import arrow.typeclasses.binding
+import io.vavr.collection.HashSet
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator
-import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration
import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration
+import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory
import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure
import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync
import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried
@@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> =
IO.monad().binding {
logger.info { "Using configuration: $config" }
XnfHealthCheckServer().startServer(config).bind()
+ val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security)
val xnfSimulator = XnfSimulator(
- VesHvClient(config),
+ ClientFactory(clientConfig),
MessageGeneratorFactory(config.maxPayloadSizeBytes)
)
XnfApiServer(xnfSimulator, OngoingSimulations())