aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt')
-rw-r--r--sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt70
1 files changed, 29 insertions, 41 deletions
diff --git a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
index e1641cbb..493100fc 100644
--- a/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
+++ b/sources/hv-collector-dcae-app-simulator/src/test/kotlin/org/onap/dcae/collectors/veshv/simulators/dcaeapp/impl/DcaeAppSimulatorTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,10 +19,8 @@
*/
package org.onap.dcae.collectors.veshv.simulators.dcaeapp.impl
-import arrow.core.Left
import arrow.core.None
import arrow.core.Some
-import arrow.effects.IO
import com.google.protobuf.ByteString
import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
@@ -30,14 +28,18 @@ import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.never
import com.nhaarman.mockitokotlin2.verify
import com.nhaarman.mockitokotlin2.whenever
-import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.Assertions.*
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.mockito.ArgumentMatchers.anySet
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.VesEvent
+import reactor.core.publisher.Mono
+import reactor.test.StepVerifier
+import java.lang.IllegalArgumentException
import java.util.concurrent.ConcurrentLinkedQueue
+import kotlin.test.assertFailsWith
/**
* @author Piotr Jaszczyk <piotr.jaszczyk@nokia.com>
@@ -55,7 +57,7 @@ internal class DcaeAppSimulatorTest : Spek({
consumer = mock()
cut = DcaeAppSimulator(consumerFactory, messageStreamValidation)
- whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(IO.just(consumer))
+ whenever(consumerFactory.createConsumerForTopics(anySet())).thenReturn(consumer)
}
fun consumerState(vararg messages: ByteArray) = ConsumerState(ConcurrentLinkedQueue(messages.toList()))
@@ -64,48 +66,36 @@ internal class DcaeAppSimulatorTest : Spek({
val topics = setOf("perf3gpp", "faults")
it("should fail when topic list is empty") {
- val result = cut.listenToTopics(setOf()).attempt().unsafeRunSync()
- assertThat(result.isLeft()).isTrue()
+ assertFailsWith(IllegalArgumentException::class){
+ cut.listenToTopics(setOf())
+ }
}
it("should fail when topic list contains empty strings") {
- val result = cut.listenToTopics(setOf("perf3gpp", " ", "faults")).attempt().unsafeRunSync()
- assertThat(result.isLeft()).isTrue()
+ assertFailsWith(IllegalArgumentException::class){
+ cut.listenToTopics(setOf("perf3gpp", " ", "faults"))
+ }
}
it("should subscribe to given topics") {
- cut.listenToTopics(topics).unsafeRunSync()
+ cut.listenToTopics(topics)
verify(consumerFactory).createConsumerForTopics(topics)
}
it("should subscribe to given topics when called with comma separated list") {
- cut.listenToTopics("perf3gpp,faults").unsafeRunSync()
+ cut.listenToTopics("perf3gpp,faults")
verify(consumerFactory).createConsumerForTopics(topics)
}
-
- it("should handle errors") {
- // given
- val error = RuntimeException("WTF")
- whenever(consumerFactory.createConsumerForTopics(anySet()))
- .thenReturn(IO.raiseError(error))
-
- // when
- val result = cut.listenToTopics("perf3gpp").attempt().unsafeRunSync()
-
- // then
- assertThat(result).isEqualTo(Left(error))
- }
}
describe("state") {
-
it("should return None when topics hasn't been initialized") {
assertThat(cut.state()).isEqualTo(None)
}
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("perf3gpp").unsafeRunSync()
+ cut.listenToTopics("perf3gpp")
}
it("should return some state when it has been set") {
@@ -119,21 +109,18 @@ internal class DcaeAppSimulatorTest : Spek({
describe("resetState") {
it("should do nothing when topics hasn't been initialized") {
- cut.resetState().unsafeRunSync()
+ cut.resetState()
verify(consumer, never()).reset()
}
describe("when topics are initialized") {
beforeEachTest {
- cut.listenToTopics("perf3gpp").unsafeRunSync()
+ cut.listenToTopics("perf3gpp")
}
it("should reset the state") {
- // given
- whenever(consumer.reset()).thenReturn(IO.unit)
-
// when
- cut.resetState().unsafeRunSync()
+ cut.resetState()
// then
verify(consumer).reset()
@@ -143,29 +130,30 @@ internal class DcaeAppSimulatorTest : Spek({
describe("validate") {
beforeEachTest {
- whenever(messageStreamValidation.validate(any(), any())).thenReturn(IO.just(true))
+ whenever(messageStreamValidation.validate(any(), any())).thenReturn(Mono.just(true))
}
it("should use empty list when consumer is unavailable") {
- // when
- val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+ StepVerifier
+ .create(cut.validate("['The JSON']".byteInputStream()))
+ .expectNext(true)
+ .verifyComplete()
- // then
verify(messageStreamValidation).validate(any(), eq(emptyList()))
- assertThat(result).isTrue()
}
it("should delegate to MessageStreamValidation") {
// given
- cut.listenToTopics("perf3gpp").unsafeRunSync()
+ cut.listenToTopics("perf3gpp")
whenever(consumer.currentState()).thenReturn(consumerState(vesEvent().toByteArray()))
- // when
- val result = cut.validate("['The JSON']".byteInputStream()).unsafeRunSync()
+ StepVerifier
+ .create(cut.validate("['The JSON']".byteInputStream()))
+ .expectNext(true)
+ .verifyComplete()
// then
verify(messageStreamValidation).validate(any(), any())
- assertThat(result).isTrue()
}
}
})