From 2174a045086e16611128b20a6d4357c04d9eac4a Mon Sep 17 00:00:00 2001 From: Filip Krzywka Date: Tue, 26 Mar 2019 14:21:02 +0100 Subject: Redefine Routing As all needed information to route messege is contained inside of KafkaSink message, we can simply put this object as part of single Route. Change-Id: I2e7df2e0193eb2af5283980d4d5c8df03ac94df9 Issue-ID: DCAEGEN2-1347 Signed-off-by: Filip Krzywka --- .../veshv/main/metrics/MicrometerMetrics.kt | 4 +-- .../collectors/veshv/main/servers/VesServer.kt | 6 ++-- .../collectors/veshv/main/MicrometerMetricsTest.kt | 33 ++++++++++++---------- 3 files changed, 24 insertions(+), 19 deletions(-) (limited to 'sources/hv-collector-main/src') diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index 2fb44768..c04c2c95 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * dcaegen2-collectors-veshv * ================================================================================ - * Copyright (C) 2018 NOKIA + * Copyright (C) 2018-2019 NOKIA * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -103,7 +103,7 @@ class MicrometerMetrics internal constructor( override fun notifyMessageSent(msg: RoutedMessage) { val now = Instant.now() sentMessages.increment() - sentMessagesByTopic(msg.topic).increment() + sentMessagesByTopic(msg.targetTopic).increment() processingTime.record(Duration.between(msg.message.wtpFrame.receivedAt, now)) totalLatency.record(Duration.between(epochMicroToInstant(msg.message.header.lastEpochMicrosec), now)) diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt index d15dccef..aed4d928 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/servers/VesServer.kt @@ -23,6 +23,7 @@ import org.onap.dcae.collectors.veshv.boundary.Server import org.onap.dcae.collectors.veshv.config.api.model.HvVesConfiguration import org.onap.dcae.collectors.veshv.factory.CollectorFactory import org.onap.dcae.collectors.veshv.factory.ServerFactory +import org.onap.dcae.collectors.veshv.healthcheck.api.HealthState import org.onap.dcae.collectors.veshv.impl.adapters.AdapterFactory import org.onap.dcae.collectors.veshv.main.metrics.MicrometerMetrics import org.onap.dcae.collectors.veshv.model.ServiceContext @@ -59,9 +60,10 @@ object VesServer { private fun initializeCollectorFactory(config: HvVesConfiguration): CollectorFactory = CollectorFactory( AdapterFactory.configurationProvider(config.cbs), - AdapterFactory.sinkCreatorFactory(config.collector), + AdapterFactory.sinkCreatorFactory(), MicrometerMetrics.INSTANCE, - config.server.maxPayloadSizeBytes + config.server.maxPayloadSizeBytes, + HealthState.INSTANCE ) private fun logServerStarted(handle: ServerHandle) = diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index e452a5f4..f260f158 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -19,6 +19,7 @@ */ package org.onap.dcae.collectors.veshv.main +import arrow.core.Option import arrow.core.Try import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Gauge @@ -44,6 +45,7 @@ import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.vesEvent import org.onap.dcae.collectors.veshv.tests.utils.wireProtocolFrame +import org.onap.ves.VesEventOuterClass import java.time.Instant import java.time.temporal.Temporal import java.util.concurrent.TimeUnit @@ -383,23 +385,24 @@ object MicrometerMetricsTest : Spek({ }) fun routedMessage(topic: String, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } + vesEvent().run { toRoutedMessage(topic, partition) } fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = - vesEvent().let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt).copy(receivedAt = receivedAt))) - } + vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = - vesEvent().let { evt -> - val builder = evt.toBuilder() + vesEvent().run { + val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 - builder.build() - }.let { evt -> - RoutedMessage(topic, partition, - VesMessage(evt.commonEventHeader, wireProtocolFrame(evt))) - } \ No newline at end of file + builder.build().toRoutedMessage(topic, partition) + } + +private fun VesEventOuterClass.VesEvent.toRoutedMessage(topic: String, + partition: Int, + receivedAt: Temporal = Instant.now()) = + RoutedMessage( + VesMessage(this.commonEventHeader, wireProtocolFrame(this).copy(receivedAt = receivedAt)), + topic, + Option.just(partition) + ) + -- cgit 1.2.3-korg