aboutsummaryrefslogtreecommitdiffstats
path: root/hv-collector-xnf-simulator
diff options
context:
space:
mode:
Diffstat (limited to 'hv-collector-xnf-simulator')
-rw-r--r--hv-collector-xnf-simulator/pom.xml30
-rw-r--r--hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt30
-rw-r--r--hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt10
3 files changed, 21 insertions, 49 deletions
diff --git a/hv-collector-xnf-simulator/pom.xml b/hv-collector-xnf-simulator/pom.xml
index 85ef0907..6526915a 100644
--- a/hv-collector-xnf-simulator/pom.xml
+++ b/hv-collector-xnf-simulator/pom.xml
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>ves-hv-collector</artifactId>
- <version>4.0.0-SNAPSHOT</version>
+ <version>1.1.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
@@ -139,34 +139,6 @@
</dependency>
-->
<dependency>
- <groupId>com.nhaarman</groupId>
- <artifactId>mockito-kotlin</artifactId>
- </dependency>
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.assertj</groupId>
- <artifactId>assertj-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.kotlin</groupId>
- <artifactId>kotlin-test</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-api</artifactId>
- </dependency>
- <dependency>
- <groupId>org.jetbrains.spek</groupId>
- <artifactId>spek-junit-platform-engine</artifactId>
- </dependency>
- <dependency>
- <groupId>io.projectreactor</groupId>
- <artifactId>reactor-test</artifactId>
- </dependency>
- <dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>runtime</scope>
diff --git a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
index 7a280c10..8df416c9 100644
--- a/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
+++ b/hv-collector-xnf-simulator/src/main/kotlin/org/onap/dcae/collectors/veshv/simulators/xnf/impl/adapters/VesHvClient.kt
@@ -35,8 +35,8 @@ import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.publisher.ReplayProcessor
-import reactor.ipc.netty.NettyOutbound
-import reactor.ipc.netty.tcp.TcpClient
+import reactor.netty.NettyOutbound
+import reactor.netty.tcp.TcpClient
/**
* @author Jakub Dudycz <jakub.dudycz@nokia.com>
@@ -44,13 +44,12 @@ import reactor.ipc.netty.tcp.TcpClient
*/
class VesHvClient(private val configuration: SimulatorConfiguration) {
- private val client: TcpClient = TcpClient.builder()
- .options { opts ->
- opts.host(configuration.vesHost)
- .port(configuration.vesPort)
- .sslContext(createSslContext(configuration.security).orNull())
+ private val client: TcpClient = TcpClient.create()
+ .host(configuration.vesHost)
+ .port(configuration.vesPort)
+ .secure { sslSpec ->
+ createSslContext(configuration.security).fold({}, sslSpec::sslContext)
}
- .build()
fun sendIo(messages: Flux<WireFrameMessage>) =
sendRx(messages).then(Mono.just(Unit)).asIo()
@@ -58,7 +57,8 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun sendRx(messages: Flux<WireFrameMessage>): Mono<Void> {
val complete = ReplayProcessor.create<Void>(1)
client
- .newHandler { _, output -> handler(complete, messages, output) }
+ .handle { _, output -> handler(complete, messages, output) }
+ .connect()
.doOnError {
logger.info("Failed to connect to VesHvCollector on " +
"${configuration.vesHost}:${configuration.vesPort}")
@@ -94,12 +94,12 @@ class VesHvClient(private val configuration: SimulatorConfiguration) {
private fun createSslContext(config: SecurityConfiguration): Option<SslContext> =
ClientSslContextFactory().createSslContext(config)
- private fun NettyOutbound.logConnectionClosed(): NettyOutbound {
- context().onClose {
- logger.info { "Connection to ${context().address()} has been closed" }
- }
- return this
- }
+ private fun NettyOutbound.logConnectionClosed() =
+ withConnection { conn ->
+ conn.onTerminate().subscribe {
+ logger.info { "Connection to ${conn.address()} has been closed" }
+ }
+ }
companion object {
private val logger = Logger(VesHvClient::class)
diff --git a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
index 2a78ed5e..95510e77 100644
--- a/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
+++ b/hv-collector-xnf-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/main/XnfSimulatorTest.kt
@@ -23,10 +23,9 @@ import arrow.core.Left
import arrow.core.None
import arrow.core.Right
import arrow.effects.IO
-import com.nhaarman.mockito_kotlin.any
-import com.nhaarman.mockito_kotlin.mock
-import com.nhaarman.mockito_kotlin.whenever
-import com.sun.xml.internal.messaging.saaj.util.ByteInputStream
+import com.nhaarman.mockitokotlin2.any
+import com.nhaarman.mockitokotlin2.mock
+import com.nhaarman.mockitokotlin2.whenever
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
@@ -39,6 +38,7 @@ import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParameter
import org.onap.dcae.collectors.veshv.ves.message.generator.api.MessageParametersParser
import org.onap.dcae.collectors.veshv.ves.message.generator.api.ParsingError
import reactor.core.publisher.Flux
+import java.io.ByteArrayInputStream
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -60,7 +60,7 @@ internal class XnfSimulatorTest : Spek({
describe("startSimulation") {
it("should fail when empty input stream") {
// given
- val emptyInputStream = ByteInputStream()
+ val emptyInputStream = ByteArrayInputStream(byteArrayOf())
// when
val result = cut.startSimulation(emptyInputStream)