diff options
author | 2021-03-31 13:43:34 +0200 | |
---|---|---|
committer | 2021-04-01 10:51:50 +0200 | |
commit | 516014379cdc27d163be223bb8ce9f87d72dea1f (patch) | |
tree | 680fa171ca6b9fa2b439f0ea727b96618c8ca158 /sources/hv-collector-core | |
parent | c398d5afd86d1aedab9cbd7b8f8c3d7b48eb2e2b (diff) |
Add stndDefined domain to HV-VES1.8.0
- Updated SDK dependency to 1.8.2 so HV-VES recognizes
stndDefinedNamespace field during events deserialization
- Added new domain stndDefined
- Added routings specific for stndDefined events which is controled
by stndDefinedNamespace field, not domain
- Bumped HV-VES to 1.8.0
Signed-off-by: Michal Banka <michal.banka@nokia.com>
Change-Id: Ia27f2de7a4b64df5e8a182888006f7e8a941ca63
Issue-ID: DCAEGEN2-2702
Diffstat (limited to 'sources/hv-collector-core')
3 files changed, 57 insertions, 22 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml index 0f1d2c3b..95820d07 100644 --- a/sources/hv-collector-core/pom.xml +++ b/sources/hv-collector-core/pom.xml @@ -3,7 +3,7 @@ ~ ============LICENSE_START======================================================= ~ dcaegen2-collectors-veshv ~ ================================================================================ - ~ Copyright (C) 2018-2019 NOKIA + ~ Copyright (C) 2018-2021 NOKIA ~ ================================================================================ ~ Licensed under the Apache License, Version 2.0 (the "License"); ~ you may not use this file except in compliance with the License. @@ -33,7 +33,7 @@ <parent> <groupId>org.onap.dcaegen2.collectors.hv-ves</groupId> <artifactId>hv-collector-sources</artifactId> - <version>1.7.0-SNAPSHOT</version> + <version>1.8.0-SNAPSHOT</version> <relativePath>..</relativePath> </parent> diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt index c4e877bf..dfa40006 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2018-2021 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,15 +20,17 @@ package org.onap.dcae.collectors.veshv.impl import arrow.core.None +import arrow.core.Option import arrow.core.toOption import org.onap.dcae.collectors.veshv.boundary.Metrics import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.boundary.SinkFactory import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing -import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ConsumedMessage import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.utils.logging.Logger @@ -36,9 +38,9 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader import reactor.core.publisher.Flux internal class Router internal constructor(private val routing: Routing, - private val messageSinks: Map<String, Lazy<Sink>>, - private val ctx: ClientContext, - private val metrics: Metrics) { + private val messageSinks: Map<String, Lazy<Sink>>, + private val ctx: ClientContext, + private val metrics: Metrics) { constructor(routing: Routing, sinkFactory: SinkFactory, ctx: ClientContext, @@ -70,8 +72,14 @@ internal class Router internal constructor(private val routing: Routing, } - private fun routeFor(header: CommonEventHeader) = - routing.find { it.domain == header.domain }.toOption() + private fun routeFor(header: CommonEventHeader): Option<Route> = + routing.find { + if (header.domain == VesEventDomain.STND_DEFINED.domainName) + it.domain == header.stndDefinedNamespace + else { + it.domain == header.domain + } + }.toOption() private fun messageSinkFor(sinkTopic: String) = messageSinks .getOrElse(sinkTopic) { diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt index 533581d5..ad655f68 100644 --- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt +++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018-2019 NOKIA + * Copyright (C) 2018-2021 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,9 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.Route import org.onap.dcae.collectors.veshv.config.api.model.Routing import org.onap.dcae.collectors.veshv.boundary.Sink import org.onap.dcae.collectors.veshv.domain.RoutedMessage -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT -import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG +import org.onap.dcae.collectors.veshv.domain.VesEventDomain +import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage @@ -55,10 +54,12 @@ object RouterTest : Spek({ describe("Router") { whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic) + whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic) whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic) val messageSinkMap = mapOf( Pair(perf3gppTopic, lazyOf(messageSinkMock)), + Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)), Pair(syslogTopic, lazyOf(messageSinkMock)) ) @@ -72,7 +73,7 @@ object RouterTest : Spek({ it("should be properly routed") { val result = cut.route(perf3gppMessage) - assertThat(result).isNotNull() + assertThat(result).isNotNull StepVerifier.create(result) .expectNext(successfullyConsumedPerf3gppMessage) .verifyComplete() @@ -97,8 +98,23 @@ object RouterTest : Spek({ } } + on("message with existing stndDefined route (ves3gppHeartbeat)") { + whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage)) + .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage)) + val result = cut.route(ves3gppHeartbeatMessage) + + it("should be properly routed") { + StepVerifier.create(result) + .expectNext(successfullyConsumedVes3gppHeartbeatMessage) + .verifyComplete() + + verify(ves3gppHeartbeatSinkMock).topicName() + verify(messageSinkMock).send(routedVes3gppHeartbeatMessage) + } + } + on("message with unknown route") { - val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame()) + val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame()) val result = cut.route(message) it("should not have route available") { @@ -113,23 +129,34 @@ object RouterTest : Spek({ private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) = Router(routing, kafkaPublisherMap, ClientContext(), mock()) -private val perf3gppTopic = "PERF_PERF" +private const val perf3gppTopic = "PERF_PERF" private val perf3gppSinkMock = mock<KafkaSink>() -private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock) +private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock) + +private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT" +private val ves3gppHeartbeatSinkMock = mock<KafkaSink>() +private val defaultVes3gppHeartbeatRoute = + Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock) -private val syslogTopic = "SYS_LOG" +private const val syslogTopic = "SYS_LOG" private val syslogSinkMock = mock<KafkaSink>() -private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock) +private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock) + +private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute) -private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute) private val messageSinkMock = mock<Sink>() private val default_partition = None -private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame()) +private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame()) private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition) private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage) -private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame()) +private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED, + stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame()) +private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition) +private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage) + +private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame()) private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition) private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage)
\ No newline at end of file |