summaryrefslogtreecommitdiffstats
path: root/sources/hv-collector-main
diff options
context:
space:
mode:
authorFilip Krzywka <filip.krzywka@nokia.com>2019-03-26 14:21:02 +0100
committerFilip Krzywka <filip.krzywka@nokia.com>2019-03-28 14:16:02 +0100
commit2174a045086e16611128b20a6d4357c04d9eac4a (patch)
tree6302837fc6ce5fac26a9da91e7353247c397bc0a /sources/hv-collector-main
parent1b7ac38627977e8ef2209a3a98a8cd0c2da785dd (diff)
Redefine Routing
As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka <filip.krzywka@nokia.com>
Diffstat (limited to 'sources/hv-collector-main')
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt4
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt6
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt33
3 files changed, 24 insertions, 19 deletions
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index 2fb44768..c04c2c95 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -2,7 +2,7 @@
* ============LICENSE_START=======================================================
* dcaegen2-collectors-veshv
* ================================================================================
- * Copyright (C) 2018 NOKIA
+ * Copyright (C) 2018-2019 NOKIA
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor(
override fun notifyMessageSent(msg: RoutedMessage) {
val now = Instant.now()
sentMessages.increment()
- sentMessagesByTopic(msg.topic).increment()
+ sentMessagesByTopic(msg.targetTopic).increment()
processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now))
totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now))
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
index d15dccef..aed4d928 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt
@@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server
import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration
import org.onap.dcae.collectors.veshv.factory.CollectorFactory
import org.onap.dcae.collectors.veshv.factory.ServerFactory
+import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState
import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory
import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics
import org.onap.dcae.collectors.veshv.model.ServiceContext
@@ -59,9 +60,10 @@ object VesServer {
private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory =
CollectorFactory(
AdapterFactory.configurationProvider(config.cbs),
- AdapterFactory.sinkCreatorFactory(config.collector),
+ AdapterFactory.sinkCreatorFactory(),
MicrometerMetrics.INSTANCE,
- config.server.maxPayloadSizeBytes
+ config.server.maxPayloadSizeBytes,
+ HealthState.INSTANCE
)
private fun logServerStarted(handle: ServerHandle) =
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index e452a5f4..f260f158 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -19,6 +19,7 @@
*/
package org.onap.dcae.collectors.veshv.main
+import arrow.core.Option
import arrow.core.Try
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Gauge
@@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.vesEvent
import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame
+import org.onap.ves.VesEventOuterClass
import java.time.Instant
import java.time.temporal.Temporal
import java.util.concurrent.TimeUnit
@@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({
})
fun routedMessage(topic: String, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition) }
fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
- vesEvent().let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt)))
- }
+ vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
- vesEvent().let { evt ->
- val builder = evt.toBuilder()
+ vesEvent().run {
+ val builder = toBuilder()
builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
- builder.build()
- }.let { evt ->
- RoutedMessage(topic, partition,
- VesMessage(evt.commonEventHeader, wireProtocolFrame(evt)))
- } \ No newline at end of file
+ builder.build().toRoutedMessage(topic, partition)
+ }
+
+private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String,
+ partition: Int,
+ receivedAt: Temporal = Instant.now()) =
+ RoutedMessage(
+ VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)),
+ topic,
+ Option.just(partition)
+ )
+