aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
diff options
context:
space:
mode:
Diffstat (limited to 'sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt')
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt53
1 files changed, 40 insertions, 13 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 533581d5..ad655f68 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,9 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
@@ -55,10 +54,12 @@ object RouterTest : Spek({
describe("Router") {
whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+ whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
val messageSinkMap = mapOf(
Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+ Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
Pair(syslogTopic, lazyOf(messageSinkMock))
)
@@ -72,7 +73,7 @@ object RouterTest : Spek({
it("should be properly routed") {
val result = cut.route(perf3gppMessage)
- assertThat(result).isNotNull()
+ assertThat(result).isNotNull
StepVerifier.create(result)
.expectNext(successfullyConsumedPerf3gppMessage)
.verifyComplete()
@@ -97,8 +98,23 @@ object RouterTest : Spek({
}
}
+ on("message with existing stndDefined route (ves3gppHeartbeat)") {
+ whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
+ .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
+ val result = cut.route(ves3gppHeartbeatMessage)
+
+ it("should be properly routed") {
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
+ .verifyComplete()
+
+ verify(ves3gppHeartbeatSinkMock).topicName()
+ verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
+ }
+ }
+
on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+ val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
val result = cut.route(message)
it("should not have route available") {
@@ -113,23 +129,34 @@ object RouterTest : Spek({
private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
Router(routing, kafkaPublisherMap, ClientContext(), mock())
-private val perf3gppTopic = "PERF_PERF"
+private const val perf3gppTopic = "PERF_PERF"
private val perf3gppSinkMock = mock<KafkaSink>()
-private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
+
+private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
+private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
+private val defaultVes3gppHeartbeatRoute =
+ Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
-private val syslogTopic = "SYS_LOG"
+private const val syslogTopic = "SYS_LOG"
private val syslogSinkMock = mock<KafkaSink>()
-private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
-private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
private val messageSinkMock = mock<Sink>()
private val default_partition = None
-private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
-private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
+ stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
+private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
+private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
+
+private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file