aboutsummaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-core
diff options
context:
space:
mode:
authorPiotr Jaszczyk <piotr.jaszczyk@nokia.com>2019-03-26 12:40:13 +0000
committerGerrit Code Review <gerrit@onap.org>2019-03-26 12:40:13 +0000
commitb1fbc2cef72a07c7da742d612b8e9650e7407877 (patch)
tree6603c70af9029701294fd3783c9df1a1016b7c8f /sources/hv-collector-core
parentcb08e0822cc44fc7b382e9b48682ab93607f7c43 (diff)
parent7e77162022371860d13939be1848982a735cdab9 (diff)
Merge "Use DataStream API from CBS client"
Diffstat (limited to 'sources/hv-collector-core')
-rw-r--r--sources/hv-collector-core/pom.xml4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt4
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt8
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt16
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt43
-rw-r--r--sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt94
6 files changed, 102 insertions, 67 deletions
diff --git a/sources/hv-collector-core/pom.xml b/sources/hv-collector-core/pom.xml
index 823f671a..e7134e18 100644
--- a/sources/hv-collector-core/pom.xml
+++ b/sources/hv-collector-core/pom.xml
@@ -119,6 +119,10 @@
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+ <artifactId>cbs-client</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 782d2324..f475a0eb 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -19,7 +19,6 @@
*/
package org.onap.dcae.collectors.veshv.boundary
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientContext
@@ -27,6 +26,7 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.ConsumedMessage
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.utils.Closeable
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import reactor.core.publisher.Flux
interface Sink {
@@ -48,5 +48,5 @@ interface SinkProvider : Closeable {
}
interface ConfigurationProvider {
- operator fun invoke(): Flux<Routing>
+ operator fun invoke(): Flux<Sequence<KafkaSink>>
}
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
index c08df748..c674ef36 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/factory/CollectorFactory.kt
@@ -25,7 +25,6 @@ import org.onap.dcae.collectors.veshv.boundary.CollectorProvider
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.boundary.Metrics
import org.onap.dcae.collectors.veshv.boundary.SinkProvider
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
import org.onap.dcae.collectors.veshv.domain.WireFrameDecoder
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
@@ -37,6 +36,7 @@ import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.model.ServiceContext
import org.onap.dcae.collectors.veshv.utils.arrow.getOption
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import java.util.concurrent.atomic.AtomicReference
/**
@@ -50,7 +50,7 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
private val healthState: HealthState = HealthState.INSTANCE) {
fun createVesHvCollectorProvider(): CollectorProvider {
- val config = AtomicReference<Routing>()
+ val config = AtomicReference<Sequence<KafkaSink>>()
configuration()
.doOnNext {
logger.info(ServiceContext::mdc) { "Using updated configuration for new connections" }
@@ -71,12 +71,12 @@ class CollectorFactory(private val configuration: ConfigurationProvider,
}
}
- private fun createVesHvCollector(routing: Routing, ctx: ClientContext): Collector =
+ private fun createVesHvCollector(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext): Collector =
VesHvCollector(
clientContext = ctx,
wireChunkDecoder = WireChunkDecoder(WireFrameDecoder(maxPayloadSizeBytes), ctx),
protobufDecoder = VesDecoder(),
- router = Router(routing, ctx),
+ router = Router(kafkaSinks, ctx),
sink = sinkProvider(ctx),
metrics = metrics)
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
index bd92c6d3..723ba39a 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/Router.kt
@@ -21,13 +21,29 @@ package org.onap.dcae.collectors.veshv.impl
import arrow.core.Option
import org.onap.dcae.collectors.veshv.config.api.model.Routing
+import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.model.ClientContext
import org.onap.dcae.collectors.veshv.impl.adapters.ClientContextLogging.debug
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.logging.Logger
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
class Router(private val routing: Routing, private val ctx: ClientContext) {
+
+ constructor(kafkaSinks: Sequence<KafkaSink>, ctx: ClientContext) : this(
+ routing {
+ kafkaSinks.forEach {
+ defineRoute {
+ fromDomain(it.name())
+ toTopic(it.topicName())
+ withFixedPartitioning()
+ }
+ }
+ }.build(),
+ ctx
+ )
+
fun findDestination(message: VesMessage): Option<RoutedMessage> =
routing.routeFor(message.header).map { it(message) }.also {
if (it.isEmpty()) {
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
index f96350ac..5b0dca2d 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderImpl.kt
@@ -22,8 +22,6 @@ package org.onap.dcae.collectors.veshv.impl.adapters
import com.google.gson.JsonObject
import org.onap.dcae.collectors.veshv.boundary.ConfigurationProvider
import org.onap.dcae.collectors.veshv.config.api.model.CbsConfiguration
-import org.onap.dcae.collectors.veshv.config.api.model.Routing
-import org.onap.dcae.collectors.veshv.config.api.model.routing
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -31,6 +29,10 @@ import org.onap.dcae.collectors.veshv.utils.logging.Logger
import org.onap.dcae.collectors.veshv.utils.logging.onErrorLog
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsRequests
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.DataStreams
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParser
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.dmaap.KafkaSink
import org.onap.dcaegen2.services.sdk.rest.services.model.logging.RequestDiagnosticContext
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@@ -46,6 +48,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
private val firstRequestDelay: Duration,
private val requestInterval: Duration,
private val healthState: HealthState,
+ private val streamParser: StreamFromGsonParser<KafkaSink>,
retrySpec: Retry<Any>
) : ConfigurationProvider {
@@ -54,6 +57,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
params.firstRequestDelay,
params.requestInterval,
HealthState.INSTANCE,
+ StreamFromGsonParsers.kafkaSinkParser(),
Retry.any<Any>()
.retryMax(MAX_RETRIES)
.fixedBackoff(params.requestInterval)
@@ -67,7 +71,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
healthState.changeState(HealthDescription.RETRYING_FOR_DYNAMIC_CONFIGURATION)
}
- override fun invoke(): Flux<Routing> =
+ override fun invoke(): Flux<Sequence<KafkaSink>> =
cbsClientMono
.doOnNext { logger.info(ServiceContext::mdc) { "CBS client successfully created" } }
.onErrorLog(logger, ServiceContext::mdc) { "Failed to retrieve CBS client" }
@@ -75,7 +79,7 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.doFinally { logger.trace(ServiceContext::mdc) { "CBS client subscription finished" } }
.flatMapMany(::handleUpdates)
- private fun handleUpdates(cbsClient: CbsClient): Flux<Routing> = cbsClient
+ private fun handleUpdates(cbsClient: CbsClient): Flux<Sequence<KafkaSink>> = cbsClient
.updates(CbsRequests.getConfiguration(RequestDiagnosticContext.create()),
firstRequestDelay,
requestInterval)
@@ -85,31 +89,18 @@ internal class ConfigurationProviderImpl(private val cbsClientMono: Mono<CbsClie
.retryWhen(retry)
- private fun createCollectorConfiguration(configuration: JsonObject): Routing =
- try {
- val routingArray = configuration.getAsJsonArray(ROUTING_CONFIGURATION_KEY)
- routing {
- for (route in routingArray) {
- val routeObj = route.asJsonObject
- defineRoute {
- fromDomain(routeObj.getPrimitiveAsString(DOMAIN_CONFIGURATION_KEY))
- toTopic(routeObj.getPrimitiveAsString(TOPIC_CONFIGURATION_KEY))
- withFixedPartitioning()
- }
- }
- }.build()
- } catch (e: NullPointerException) {
- throw ParsingException("Failed to parse configuration", e)
- }
-
- private fun JsonObject.getPrimitiveAsString(memberName: String) = getAsJsonPrimitive(memberName).asString
+ private fun createCollectorConfiguration(configuration: JsonObject): Sequence<KafkaSink> {
+ try {
+ val sinks = DataStreams.namedSinks(configuration)
+ .filter { it.type() == "kafka" }
+ return sinks.map(streamParser::unsafeParse).asSequence()
+ } catch (e: NullPointerException) {
+ throw ParsingException("Failed to parse configuration", e)
+ }
+ }
companion object {
- private const val ROUTING_CONFIGURATION_KEY = "collector.routing"
- private const val DOMAIN_CONFIGURATION_KEY = "fromDomain"
- private const val TOPIC_CONFIGURATION_KEY = "toTopic"
-
private const val MAX_RETRIES = 5L
private val logger = Logger(ConfigurationProviderImpl::class)
}
diff --git a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
index f830f2c9..e71250ca 100644
--- a/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
+++ b/sources/hv-collector-core/src/test/kotlin/org/onap/dcae/collectors/veshv/impl/adapters/ConfigurationProviderTest.kt
@@ -30,13 +30,12 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.dsl.on
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.HEARTBEAT
-import org.onap.dcae.collectors.veshv.domain.VesEventDomain.FAULT
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthDescription
import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.CbsClient
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.api.streams.StreamFromGsonParsers
+import org.onap.dcaegen2.services.sdk.rest.services.cbs.client.model.streams.ImmutableAafCredentials
import reactor.core.publisher.Flux
-
import reactor.core.publisher.Mono
import reactor.retry.Retry
import reactor.test.StepVerifier
@@ -77,23 +76,18 @@ internal object ConfigurationProviderImplTest : Spek({
StepVerifier.create(configProvider().take(1))
.consumeNextWith {
-
- val route1 = it.routes[0]
- assertThat(FAULT.domainName)
- .describedAs("routed domain 1")
- .isEqualTo(route1.domain)
- assertThat("test-topic-1")
- .describedAs("target topic 1")
- .isEqualTo(route1.targetTopic)
-
- val route2 = it.routes[1]
- assertThat(HEARTBEAT.domainName)
- .describedAs("routed domain 2")
- .isEqualTo(route2.domain)
- assertThat("test-topic-2")
- .describedAs("target topic 2")
- .isEqualTo(route2.targetTopic)
-
+ val receivedSink1 = it.elementAt(0)
+ val receivedSink2 = it.elementAt(1)
+
+ assertThat(receivedSink1.aafCredentials()).isEqualTo(aafCredentials1)
+ assertThat(receivedSink1.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060")
+ assertThat(receivedSink1.topicName()).isEqualTo("REG_HVVES_PERF3GPP")
+
+ assertThat(receivedSink2.aafCredentials()).isEqualTo(aafCredentials2)
+ assertThat(receivedSink2.bootstrapServers())
+ .isEqualTo("dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060")
+ assertThat(receivedSink2.topicName()).isEqualTo("CEN_HVVES_PERF3GPP")
}.verifyComplete()
}
}
@@ -126,35 +120,64 @@ internal object ConfigurationProviderImplTest : Spek({
})
+private val aafCredentials1 = ImmutableAafCredentials.builder()
+ .username("client")
+ .password("very secure password")
+ .build()
+
+private val aafCredentials2 = ImmutableAafCredentials.builder()
+ .username("other_client")
+ .password("another very secure password")
+ .build()
private val validConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "fault",
- "toTopic": "test-topic-1"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "topic_name": "REG_HVVES_PERF3GPP"
+ }
+ },
+ "perf3gpp_central": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "other_client",
+ "password": "another very secure password"
},
- {
- "fromDomain": "heartbeat",
- "toTopic": "test-topic-2"
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.central:6060,dmaap-mr-kafka-1.central:6060",
+ "topic_name": "CEN_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val invalidConfiguration = JsonParser().parse("""
{
- "whatever": "garbage",
- "collector.routing": [
- {
- "fromDomain": "garbage",
- "meaningful": "garbage"
+ "streams_publishes": {
+ "perf3gpp_regional": {
+ "type": "kafka",
+ "aaf_credentials": {
+ "username": "client",
+ "password": "very secure password"
+ },
+ "kafka_info": {
+ "bootstrap_servers": "dmaap-mr-kafka-0.regional:6060,dmaap-mr-kafka-1.regional:6060",
+ "popic_name": "REG_HVVES_PERF3GPP"
}
- ]
+ }
+ }
}""").asJsonObject
private val firstRequestDelay = Duration.ofMillis(1)
private val requestInterval = Duration.ofMillis(1)
+private val streamParser = StreamFromGsonParsers.kafkaSinkParser()
private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
healthState: HealthState,
@@ -168,6 +191,7 @@ private fun constructConfigurationProvider(cbsClientMono: Mono<CbsClient>,
firstRequestDelay,
requestInterval,
healthState,
+ streamParser,
retry
)
}