summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt')
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt13
1 files changed, 12 insertions, 1 deletions
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
index 51f724e0..160defdb 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/sink.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.tests.fakes
+import arrow.effects.IO
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.FailedToConsumeMessage
@@ -30,6 +31,7 @@ import reactor.core.publisher.Flux
import java.time.Duration
import java.util.*
import java.util.concurrent.ConcurrentLinkedDeque
+import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong
/**
@@ -38,6 +40,8 @@ import java.util.concurrent.atomic.AtomicLong
*/
class StoringSink : Sink {
private val sent: Deque<RoutedMessage> = ConcurrentLinkedDeque()
+ private val active = AtomicBoolean(true)
+ val closed get() = !active.get()
val sentMessages: List<RoutedMessage>
get() = sent.toList()
@@ -45,6 +49,13 @@ class StoringSink : Sink {
override fun send(messages: Flux<RoutedMessage>): Flux<ConsumedMessage> {
return messages.doOnNext(sent::addLast).map(::SuccessfullyConsumedMessage)
}
+
+ /*
+ * TOD0: if the code would look like:
+ * ```IO { active.set(false) }```
+ * the tests wouldn't pass even though `.unsafeRunSync()` is called (see HvVesSpec)
+ */
+ override fun close() = active.set(false).run { IO.unit }
}
/**