summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorMichal Banka <michal.banka@nokia.com>2021-03-31 13:43:34 +0200
committerMichal Banka <michal.banka@nokia.com>2021-04-01 10:51:50 +0200
commit516014379cdc27d163be223bb8ce9f87d72dea1f (patch)
tree680fa171ca6b9fa2b439f0ea727b96618c8ca158 /sources/hv-collector-core
parentc398d5afd86d1aedab9cbd7b8f8c3d7b48eb2e2b (diff)
Add stndDefined domain to HV-VES1.8.0
- Updated SDK dependency to 1.8.2 so HV-VES recognizes stndDefinedNamespace field during events deserialization - Added new domain stndDefined - Added routings specific for stndDefined events which is controled by stndDefinedNamespace field, not domain - Bumped HV-VES to 1.8.0 Signed-off-by: Michal Banka <michal.banka@nokia.com> Change-Id: Ia27f2de7a4b64df5e8a182888006f7e8a941ca63 Issue-ID: DCAEGEN2-2702
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/pom.xml4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt22
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt53
3 files changed, 57 insertions, 22 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 0f1d2c3b..95820d07 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -3,7 +3,7 @@
~ ============LICENSE_START=======================================================
~ dcaegen2-collectors-veshv
~ ================================================================================
- ~ Copyright (C) 2018-2019 NOKIA
+ ~ Copyright (C) 2018-2021 NOKIA
~ ================================================================================
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
@@ -33,7 +33,7 @@
<parent>
<groupId>org.onap.dcaegen2.collectors.hv-ves</groupId>
<artifactId>hv-collector-sources</artifactId>
- <version>1.7.0-SNAPSHOT</version>
+ <version>1.8.0-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index c4e877bf..dfa40006 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,15 +20,17 @@
package org.onap.dcae.collectors.veshv.impl
import arrow.core.None
+import arrow.core.Option
import arrow.core.toOption
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.boundary.SinkFactory
import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.logging.Logger
@@ -36,9 +38,9 @@ import org.onap.ves.VesEventOuterClass.CommonEventHeader
import reactor.core.publisher.Flux
internal class Router internal constructor(private val routing: Routing,
- private val messageSinks: Map<String, Lazy<Sink>>,
- private val ctx: ClientContext,
- private val metrics: Metrics) {
+ private val messageSinks: Map<String, Lazy<Sink>>,
+ private val ctx: ClientContext,
+ private val metrics: Metrics) {
constructor(routing: Routing,
sinkFactory: SinkFactory,
ctx: ClientContext,
@@ -70,8 +72,14 @@ internal class Router internal constructor(private val routing: Routing,
}
- private fun routeFor(header: CommonEventHeader) =
- routing.find { it.domain == header.domain }.toOption()
+ private fun routeFor(header: CommonEventHeader): Option<Route> =
+ routing.find {
+ if (header.domain == VesEventDomain.STND_DEFINED.domainName)
+ it.domain == header.stndDefinedNamespace
+ else {
+ it.domain == header.domain
+ }
+ }.toOption()
private fun messageSinkFor(sinkTopic: String) = messageSinks
.getOrElse(sinkTopic) {
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
index 533581d5..ad655f68 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/RouterTest.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018-2019 NOKIA
+ * Copyright (C) 2018-2021 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,9 +33,8 @@ import org.onap.dcae.collectors.veshv.config.api.model.Route
import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.boundary.Sink
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.PERF3GPP
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.SYSLOG
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
+import org.onap.dcae.collectors.veshv.domain.VesEventStndDefinedNamespace
import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.model.SuccessfullyConsumedMessage
@@ -55,10 +54,12 @@ object RouterTest : Spek({
describe("Router") {
whenever(perf3gppSinkMock.topicName()).thenReturn(perf3gppTopic)
+ whenever(ves3gppHeartbeatSinkMock.topicName()).thenReturn(ves3gppHeartbeatTopic)
whenever(syslogSinkMock.topicName()).thenReturn(syslogTopic)
val messageSinkMap = mapOf(
Pair(perf3gppTopic, lazyOf(messageSinkMock)),
+ Pair(ves3gppHeartbeatTopic, lazyOf(messageSinkMock)),
Pair(syslogTopic, lazyOf(messageSinkMock))
)
@@ -72,7 +73,7 @@ object RouterTest : Spek({
it("should be properly routed") {
val result = cut.route(perf3gppMessage)
- assertThat(result).isNotNull()
+ assertThat(result).isNotNull
StepVerifier.create(result)
.expectNext(successfullyConsumedPerf3gppMessage)
.verifyComplete()
@@ -97,8 +98,23 @@ object RouterTest : Spek({
}
}
+ on("message with existing stndDefined route (ves3gppHeartbeat)") {
+ whenever(messageSinkMock.send(routedVes3gppHeartbeatMessage))
+ .thenReturn(Flux.just(successfullyConsumedVes3gppHeartbeatMessage))
+ val result = cut.route(ves3gppHeartbeatMessage)
+
+ it("should be properly routed") {
+ StepVerifier.create(result)
+ .expectNext(successfullyConsumedVes3gppHeartbeatMessage)
+ .verifyComplete()
+
+ verify(ves3gppHeartbeatSinkMock).topicName()
+ verify(messageSinkMock).send(routedVes3gppHeartbeatMessage)
+ }
+ }
+
on("message with unknown route") {
- val message = VesMessage(commonHeader(HEARTBEAT), emptyWireProtocolFrame())
+ val message = VesMessage(commonHeader(VesEventDomain.HEARTBEAT), emptyWireProtocolFrame())
val result = cut.route(message)
it("should not have route available") {
@@ -113,23 +129,34 @@ object RouterTest : Spek({
private fun router(routing: Routing, kafkaPublisherMap: Map<String, Lazy<Sink>>) =
Router(routing, kafkaPublisherMap, ClientContext(), mock())
-private val perf3gppTopic = "PERF_PERF"
+private const val perf3gppTopic = "PERF_PERF"
private val perf3gppSinkMock = mock<KafkaSink>()
-private val default3gppRoute = Route(PERF3GPP.domainName, perf3gppSinkMock)
+private val default3gppRoute = Route(VesEventDomain.PERF3GPP.domainName, perf3gppSinkMock)
+
+private const val ves3gppHeartbeatTopic = "SEC_3GPP_HEARTBEAT_OUTPUT"
+private val ves3gppHeartbeatSinkMock = mock<KafkaSink>()
+private val defaultVes3gppHeartbeatRoute =
+ Route(VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT.stndDefinedNamespace, ves3gppHeartbeatSinkMock)
-private val syslogTopic = "SYS_LOG"
+private const val syslogTopic = "SYS_LOG"
private val syslogSinkMock = mock<KafkaSink>()
-private val defaultSyslogRoute = Route(SYSLOG.domainName, syslogSinkMock)
+private val defaultSyslogRoute = Route(VesEventDomain.SYSLOG.domainName, syslogSinkMock)
+
+private val defaultRouting = listOf(default3gppRoute, defaultVes3gppHeartbeatRoute, defaultSyslogRoute)
-private val defaultRouting = listOf(default3gppRoute, defaultSyslogRoute)
private val messageSinkMock = mock<Sink>()
private val default_partition = None
-private val perf3gppMessage = VesMessage(commonHeader(PERF3GPP), emptyWireProtocolFrame())
+private val perf3gppMessage = VesMessage(commonHeader(VesEventDomain.PERF3GPP), emptyWireProtocolFrame())
private val routedPerf3GppMessage = RoutedMessage(perf3gppMessage, perf3gppTopic, default_partition)
private val successfullyConsumedPerf3gppMessage = SuccessfullyConsumedMessage(routedPerf3GppMessage)
-private val syslogMessage = VesMessage(commonHeader(SYSLOG), emptyWireProtocolFrame())
+private val ves3gppHeartbeatMessage = VesMessage(commonHeader(domain = VesEventDomain.STND_DEFINED,
+ stndDefinedNamespace = VesEventStndDefinedNamespace.VES_3GPP_HEARTBEAT), emptyWireProtocolFrame())
+private val routedVes3gppHeartbeatMessage = RoutedMessage(ves3gppHeartbeatMessage, ves3gppHeartbeatTopic, default_partition)
+private val successfullyConsumedVes3gppHeartbeatMessage = SuccessfullyConsumedMessage(routedVes3gppHeartbeatMessage)
+
+private val syslogMessage = VesMessage(commonHeader(VesEventDomain.SYSLOG), emptyWireProtocolFrame())
private val routedSyslogMessage = RoutedMessage(syslogMessage, syslogTopic, default_partition)
private val successfullyConsumedSyslogMessage = SuccessfullyConsumedMessage(routedSyslogMessage) \ No newline at end of file