diff options
Diffstat (limited to 'sources/hv-collector-core/src/test')
-rw-r--r-- | sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt | 53 |
1 files changed, 40 insertions, 13 deletions
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 533581d5..ad655f68 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2018-2021 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,9 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage @@ -55,10 +54,12 @@ object RouterTest : Spek({ describe("Router") { whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic) whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) val messageSinkMap = mapOf( Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)), Pair(syslogTopic, lazyOf(messageSinkMock)) ) @@ -72,7 +73,7 @@ object RouterTest : Spek({ it("should be properly routed") { val result = cut.route(perf3gppMessage) - assertThat(result).isNotNull() + assertThat(result).isNotNull StepVerifier.create(result) .expectNext(successfullyConsumedPerf3gppMessage) .verifyComplete() @@ -97,8 +98,23 @@ object RouterTest : Spek({ } } + on("message with existing stndDefined route (ves3gppHeartbeat)") { + whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage)) + .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage)) + val result = cut.route(ves3gppHeartbeatMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedVes3gppHeartbeatMessage) + .verifyComplete() + + verify(ves3gppHeartbeatSinkMock).topicName() + verify(messageSinkMock).send(routedVes3gppHeartbeatMessage) + } + } + on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame()) val result = cut.route(message) it("should not have route available") { @@ -113,23 +129,34 @@ object RouterTest : Spek({ private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = Router(routing, kafkaPublisherMap, ClientContext(), mock()) -private val perf3gppTopic = "PERF_PERF" +private const val perf3gppTopic = "PERF_PERF" private val perf3gppSinkMock = mock<KafkaSink>() -private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) +private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock) + +private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT" +private val ves3gppHeartbeatSinkMock = mock<KafkaSink>() +private val defaultVes3gppHeartbeatRoute = + Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock) -private val syslogTopic = "SYS_LOG" +private const val syslogTopic = "SYS_LOG" private val syslogSinkMock = mock<KafkaSink>() -private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) +private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute) -private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) private val messageSinkMock = mock<Sink>() private val default_partition = None -private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame()) private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) -private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED, + stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame()) +private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition) +private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage) + +private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame()) private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file |