diff options
Diffstat (limited to 'hv-collector-xnf-simulator')
3 files changed, 21 insertions, 49 deletions
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml index 85ef0907..6526915a 100644 --- a/hv-collector-xnf-simulator/pom.xml +++ b/hv-collector-xnf-simulator/pom.xml @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>ves-hv-collector</artifactId> - <version>4.0.0-SNAPSHOT</version> + <version>1.1.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> @@ -139,34 +139,6 @@ </dependency> --> <dependency> - <groupId>com.nhaarman</groupId> - <artifactId>mockito-kotlin</artifactId> - </dependency> - <dependency> - <groupId>org.mockito</groupId> - <artifactId>mockito-core</artifactId> - </dependency> - <dependency> - <groupId>org.assertj</groupId> - <artifactId>assertj-core</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.kotlin</groupId> - <artifactId>kotlin-test</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-api</artifactId> - </dependency> - <dependency> - <groupId>org.jetbrains.spek</groupId> - <artifactId>spek-junit-platform-engine</artifactId> - </dependency> - <dependency> - <groupId>io.projectreactor</groupId> - <artifactId>reactor-test</artifactId> - </dependency> - <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <scope>runtime</scope> diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt index 7a280c10..8df416c9 100644 --- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt +++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt @@ -35,8 +35,8 @@ import org.reactivestreams.Publisher import reactor.core.publisher.Flux import reactor.core.publisher.Mono import reactor.core.publisher.ReplayProcessor -import reactor.ipc.netty.NettyOutbound -import reactor.ipc.netty.tcp.TcpClient +import reactor.netty.NettyOutbound +import reactor.netty.tcp.TcpClient /** * @author Jakub Dudycz <jakub.dudycz@nokia.com> @@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient */ class VesHvClient(private val configuration: SimulatorConfiguration) { - private val client: TcpClient = TcpClient.builder() - .options { opts -> - opts.host(configuration.vesHost) - .port(configuration.vesPort) - .sslContext(createSslContext(configuration.security).orNull()) + private val client: TcpClient = TcpClient.create() + .host(configuration.vesHost) + .port(configuration.vesPort) + .secure { sslSpec -> + createSslContext(configuration.security).fold({}, sslSpec::sslContext) } - .build() fun sendIo(messages: Flux<WireFrameMessage>) = sendRx(messages).then(Mono.just(Unit)).asIo() @@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> { val complete = ReplayProcessor.create<Void>(1) client - .newHandler { _, output -> handler(complete, messages, output) } + .handle { _, output -> handler(complete, messages, output) } + .connect() .doOnError { logger.info("Failed to connect to VesHvCollector on " + "${configuration.vesHost}:${configuration.vesPort}") @@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) { private fun createSslContext(config: SecurityConfiguration): Option<SslContext> = ClientSslContextFactory().createSslContext(config) - private fun NettyOutbound.logConnectionClosed(): NettyOutbound { - context().onClose { - logger.info { "Connection to ${context().address()} has been closed" } - } - return this - } + private fun NettyOutbound.logConnectionClosed() = + withConnection { conn -> + conn.onTerminate().subscribe { + logger.info { "Connection to ${conn.address()} has been closed" } + } + } companion object { private val logger = Logger(VesHvClient::class) diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt index 2a78ed5e..95510e77 100644 --- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt +++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt @@ -23,10 +23,9 @@ import arrow.core.Left import arrow.core.None import arrow.core.Right import arrow.effects.IO -import com.nhaarman.mockito_kotlin.any -import com.nhaarman.mockito_kotlin.mock -import com.nhaarman.mockito_kotlin.whenever -import com.sun.xml.internal.messaging.saaj.util.ByteInputStream +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it @@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError import reactor.core.publisher.Flux +import java.io.ByteArrayInputStream /** * @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com> @@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({ describe("startSimulation") { it("should fail when empty input stream") { // given - val emptyInputStream = ByteInputStream() + val emptyInputStream = ByteArrayInputStream(byteArrayOf()) // when val result = cut.startSimulation(emptyInputStream) |