diff options
author | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-19 18:06:33 +0100 |
---|---|---|
committer | Jakub Dudycz <jakub.dudycz@nokia.com> | 2019-02-21 12:54:54 +0100 |
commit | bacba429e2dd6b3048da7e75800f5ad200952599 (patch) | |
tree | f62d89972415e8e8da51fd042b980048efea0ecd /sources/hv-collector-xnf-simulator/src/main | |
parent | 82b27ff5bccc925fe03d05f259cf881fafc8a1ce (diff) |
Use sdk/hvves-producer in hvves/xnf-simulator
Change-Id: I8f493b0edd2cbaef136a22d914ad24198bb63a7f
Signed-off-by: Jakub Dudycz <jakub.dudycz@nokia.com>
Issue-ID: DCAEGEN2-1253
Diffstat (limited to 'sources/hv-collector-xnf-simulator/src/main')
6 files changed, 167 insertions, 131 deletions
diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt index 4dfdb845..812afe19 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/XnfSimulator.kt @@ -26,12 +26,15 @@ import arrow.core.fix import arrow.effects.IO import arrow.instances.either.monad.monad import arrow.typeclasses.binding -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient -import org.onap.dcae.collectors.veshv.ves.message.generator.api.* +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory +import org.onap.dcae.collectors.veshv.utils.arrow.asIo +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser +import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError +import org.onap.dcae.collectors.veshv.ves.message.generator.api.VesEventParameters +import org.onap.dcae.collectors.veshv.ves.message.generator.api.WireFrameParameters import org.onap.dcae.collectors.veshv.ves.message.generator.factory.MessageGeneratorFactory -import org.onap.ves.VesEventOuterClass.VesEvent -import reactor.core.publisher.Flux +import reactor.core.publisher.Mono import reactor.core.publisher.toFlux import java.io.InputStream import javax.json.Json @@ -42,18 +45,18 @@ import javax.json.JsonArray * @since August 2018 */ class XnfSimulator( - private val vesClient: VesHvClient, + private val clientFactory: ClientFactory, private val generatorFactory: MessageGeneratorFactory, private val messageParametersParser: MessageParametersParser = MessageParametersParser.INSTANCE) { + private val wireFrameGenerator by lazy { generatorFactory.createWireFrameGenerator() } + private val vesEventGenerator by lazy { generatorFactory.createVesEventGenerator() } + fun startSimulation(messageParameters: InputStream): Either<ParsingError, IO<Unit>> = Either.monad<ParsingError>().binding { - val json = parseJsonArray(messageParameters).bind() - messageParametersParser.parse(json).bind() - .toFlux() - .flatMap(::generateMessages) - .let { vesClient.sendIo(it) } + val parameters = messageParametersParser.parse(json).bind() + simulationFrom(parameters) }.fix() private fun parseJsonArray(jsonStream: InputStream): Either<ParsingError, JsonArray> = @@ -61,18 +64,23 @@ class XnfSimulator( .toEither() .mapLeft { ParsingError("Failed to parse JSON", Some(it)) } - private fun generateMessages(parameters: MessageParameters): Flux<WireFrameMessage> = + private fun simulationFrom(parameters: List<MessageParameters>): IO<Unit> = parameters + .toFlux() + .map(::simulate) + .then(Mono.just(Unit)) + .asIo() + + private fun simulate(parameters: MessageParameters): Mono<Unit> = when (parameters) { - is VesEventParameters -> generatorFactory - .createVesEventGenerator() - .createMessageFlux(parameters) - .map(::encodeToWireFrame) - is WireFrameParameters -> generatorFactory - .createWireFrameGenerator() - .createMessageFlux(parameters) - else -> throw IllegalStateException("Invalid parameters type") + is VesEventParameters -> { + val messages = vesEventGenerator.createMessageFlux(parameters) + val client = clientFactory.create() + client.sendVesEvents(messages) + } + is WireFrameParameters -> { + val messages = wireFrameGenerator.createMessageFlux(parameters) + val client = clientFactory.create(parameters.wireFrameVersion) + client.sendRawPayload(messages) + } } - - private fun encodeToWireFrame(event: VesEvent): WireFrameMessage = - WireFrameMessage(event.toByteArray()) } diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt new file mode 100644 index 00000000..afc157c4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/HvVesClient.kt @@ -0,0 +1,49 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2018-2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters + +import org.onap.dcae.collectors.veshv.utils.arrow.then +import org.onap.dcae.collectors.veshv.utils.logging.Logger +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducer +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.PayloadType +import org.onap.ves.VesEventOuterClass.VesEvent +import reactor.core.publisher.Flux +import reactor.core.publisher.Mono +import java.nio.ByteBuffer + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since June 2018 + */ +class HvVesClient(private val producer: HvVesProducer) { + + fun sendVesEvents(messages: Flux<VesEvent>): Mono<Unit> = + producer.send(messages) + .then { logger.info { "Ves Events have been sent" } } + + + fun sendRawPayload(messages: Flux<ByteBuffer>): Mono<Unit> = + producer.sendRaw(messages, PayloadType.UNDEFINED) + .then { logger.info { "Raw messages have been sent" } } + + companion object { + private val logger = Logger(HvVesClient::class) + } +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt deleted file mode 100644 index eba8ed88..00000000 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ /dev/null @@ -1,106 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * dcaegen2-collectors-veshv - * ================================================================================ - * Copyright (C) 2018 NOKIA - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters - -import arrow.core.Option -import arrow.core.getOrElse -import io.netty.handler.ssl.SslContext -import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration -import org.onap.dcae.collectors.veshv.domain.WireFrameEncoder -import org.onap.dcae.collectors.veshv.domain.WireFrameMessage -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration -import org.onap.dcae.collectors.veshv.ssl.boundary.SslContextFactory -import org.onap.dcae.collectors.veshv.utils.arrow.asIo -import org.onap.dcae.collectors.veshv.utils.logging.Logger -import org.reactivestreams.Publisher -import reactor.core.publisher.Flux -import reactor.core.publisher.Mono -import reactor.core.publisher.ReplayProcessor -import reactor.netty.NettyOutbound -import reactor.netty.tcp.TcpClient -import reactor.util.concurrent.Queues.XS_BUFFER_SIZE - -/** - * @author Jakub Dudycz <jakub.dudycz@nokia.com> - * @since June 2018 - */ -class VesHvClient(private val configuration: SimulatorConfiguration) { - - private val client: TcpClient = TcpClient.create() - .addressSupplier { configuration.hvVesAddress } - .configureSsl() - - private fun TcpClient.configureSsl() = - createSslContext(configuration.security) - .map { sslContext -> this.secure(sslContext) } - .getOrElse { this } - - fun sendIo(messages: Flux<WireFrameMessage>) = - sendRx(messages).then(Mono.just(Unit)).asIo() - - private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { - val complete = ReplayProcessor.create<Void>(1) - client - .handle { _, output -> handler(complete, messages, output) } - .connect() - .doOnError { - logger.info { "Failed to connect to VesHvCollector on ${configuration.hvVesAddress}" } - } - .subscribe { - logger.info { "Connected to VesHvCollector on ${configuration.hvVesAddress}" } - } - return complete.then() - } - - private fun handler(complete: ReplayProcessor<Void>, - messages: Flux<WireFrameMessage>, - nettyOutbound: NettyOutbound): Publisher<Void> { - - val allocator = nettyOutbound.alloc() - val encoder = WireFrameEncoder(allocator) - val frames = messages - .map(encoder::encode) - .window(XS_BUFFER_SIZE) - - return nettyOutbound - .logConnectionClosed() - .options { it.flushOnBoundary() } - .sendGroups(frames) - .then { - logger.info { "Messages have been sent" } - complete.onComplete() - } - .then() - } - - private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = - SslContextFactory().createClientContext(config) - - private fun NettyOutbound.logConnectionClosed() = - withConnection { conn -> - conn.onDispose { - logger.info { "Connection to ${conn.address()} has been closed" } - } - } - - companion object { - private val logger = Logger(VesHvClient::class) - } -} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt new file mode 100644 index 00000000..1db66f11 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/config/ClientConfiguration.kt @@ -0,0 +1,31 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.config + +import io.vavr.collection.Set +import org.onap.dcae.collectors.veshv.domain.SecurityConfiguration +import java.net.InetSocketAddress + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since February 2019 + */ +data class ClientConfiguration(val collectorAddresses: Set<InetSocketAddress>, + val security: SecurityConfiguration) diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt new file mode 100644 index 00000000..a91fccd4 --- /dev/null +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/factory/ClientFactory.kt @@ -0,0 +1,51 @@ +/* + * ============LICENSE_START======================================================= + * dcaegen2-collectors-veshv + * ================================================================================ + * Copyright (C) 2019 NOKIA + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory + +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.HvVesClient +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.HvVesProducerFactory +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.ImmutableProducerOptions +import org.onap.dcaegen2.services.sdk.services.hvves.client.producer.api.options.WireFrameVersion + +/** + * @author Jakub Dudycz <jakub.dudycz@nokia.com> + * @since February 2019 + */ +class ClientFactory(configuration: ClientConfiguration) { + + private val partialConfig = ImmutableProducerOptions + .builder() + .collectorAddresses(configuration.collectorAddresses) + .let { producerOptions -> + configuration.security.keys.fold( + { producerOptions }, + { producerOptions.securityKeys(it) }) + } + + fun create(wireFrameVersion: WireFrameVersion): HvVesClient = + buildClient(partialConfig.wireFrameVersion(wireFrameVersion)) + + + fun create(): HvVesClient = buildClient(partialConfig) + + private fun buildClient(config: ImmutableProducerOptions.Builder) = + HvVesClient(HvVesProducerFactory.create(config.build())) +} diff --git a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt index ef627304..366c7e66 100644 --- a/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt +++ b/sources/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/main.kt @@ -23,15 +23,17 @@ import arrow.effects.IO import arrow.effects.fix import arrow.effects.instances.io.monad.monad import arrow.typeclasses.binding +import io.vavr.collection.HashSet import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.simulators.xnf.impl.OngoingSimulations import org.onap.dcae.collectors.veshv.simulators.xnf.impl.XnfSimulator -import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.VesHvClient import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfApiServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.adapters.XnfHealthCheckServer import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ArgXnfSimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.ClientConfiguration import org.onap.dcae.collectors.veshv.simulators.xnf.impl.config.SimulatorConfiguration +import org.onap.dcae.collectors.veshv.simulators.xnf.impl.factory.ClientFactory import org.onap.dcae.collectors.veshv.utils.arrow.ExitFailure import org.onap.dcae.collectors.veshv.utils.arrow.unsafeRunEitherSync import org.onap.dcae.collectors.veshv.utils.commandline.handleWrongArgumentErrorCurried @@ -65,8 +67,9 @@ private fun startServers(config: SimulatorConfiguration): IO<RatpackServer> = IO.monad().binding { logger.info { "Using configuration: $config" } XnfHealthCheckServer().startServer(config).bind() + val clientConfig = ClientConfiguration(HashSet.of(config.hvVesAddress), config.security) val xnfSimulator = XnfSimulator( - VesHvClient(config), + ClientFactory(clientConfig), MessageGeneratorFactory(config.maxPayloadSizeBytes) ) XnfApiServer(xnfSimulator, OngoingSimulations()) |