diff options
7 files changed, 72 insertions, 20 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt index 28b28203..41993e62 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.boundary import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.domain.logging.ClientContext import org.onap.dcae.collectors.veshv.model.ClientRejectionCause @@ -42,6 +43,7 @@ interface SinkFactory : Closeable { interface Metrics { fun notifyBytesReceived(size: Int) fun notifyMessageReceived(msg: WireFrameMessage) + fun notifyMessageReadyForRouting(msg: VesMessage) fun notifyMessageSent(msg: RoutedMessage) fun notifyMessageDropped(cause: MessageDropCause) fun notifyClientDisconnected() diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt index ac7c3917..f0d1465b 100644 --- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt +++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt @@ -92,6 +92,7 @@ internal class HvVesCollector( } private fun route(flux: Flux<VesMessage>) = flux + .doOnNext(metrics::notifyMessageReadyForRouting) .flatMap(router::route) .doOnNext(this::updateSinkMetrics) diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt index a450b794..3b01d137 100644 --- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt +++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt @@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import java.time.Duration import java.time.Instant import kotlin.test.fail @@ -38,6 +39,7 @@ class FakeMetrics : Metrics { var messageBytesReceived: Int = 0; private set var messagesDroppedCount: Int = 0; private set var lastProcessingTimeMicros: Double = -1.0; private set + var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set var messagesSentCount: Int = 0; private set var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set @@ -52,6 +54,10 @@ class FakeMetrics : Metrics { messageBytesReceived += msg.payloadSize } + override fun notifyMessageReadyForRouting(msg: VesMessage) { + lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0 + } + override fun notifyMessageSent(msg: RoutedMessage) { messagesSentCount++ messagesSentToTopic.compute(msg.targetTopic) { k, _ -> diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt index fa52ac2c..9d417a28 100644 --- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt +++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt @@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.dcae.collectors.veshv.model.ClientRejectionCause import org.onap.dcae.collectors.veshv.model.MessageDropCause import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesMessage import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant import java.time.Duration import java.time.Instant @@ -46,7 +47,6 @@ import java.time.Instant class MicrometerMetrics internal constructor( private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) ) : Metrics { - private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES)) private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED)) private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES)) @@ -58,6 +58,9 @@ class MicrometerMetrics internal constructor( .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) .register(registry) + private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING)) + .publishPercentileHistogram(true) + .register(registry) private val totalLatency = Timer.builder(name(MESSAGES, LATENCY)) .maximumExpectedValue(MAX_BUCKET_DURATION) .publishPercentileHistogram(true) @@ -67,12 +70,10 @@ class MicrometerMetrics internal constructor( private val sentMessagesByTopic = { topic: String -> registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic) }.memoize<String, Counter>() - private val droppedMessages = registry.counter(name(MESSAGES, DROPPED)) private val messagesDroppedByCause = { cause: String -> registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause) }.memoize<String, Counter>() - private val clientsRejected = registry.counter(name(CLIENTS, REJECTED)) private val clientsRejectedByCause = { cause: String -> registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause) @@ -97,6 +98,10 @@ class MicrometerMetrics internal constructor( receivedBytes.increment(size.toDouble()) } + override fun notifyMessageReadyForRouting(msg: VesMessage) { + processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now())) + } + override fun notifyMessageReceived(msg: WireFrameMessage) { receivedMessages.increment() receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble()) @@ -150,6 +155,8 @@ class MicrometerMetrics internal constructor( internal const val LATENCY = "latency" internal const val PAYLOAD = "payload" internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L) + internal const val WITHOUT = "without" + internal const val ROUTING = "routing" internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}" } } diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt index 66f3a5fc..a3471d46 100644 --- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt +++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt @@ -20,6 +20,7 @@ package org.onap.dcae.collectors.veshv.main import arrow.core.Option +import com.google.protobuf.ByteString import io.micrometer.core.instrument.Counter import io.micrometer.core.instrument.Meter import io.micrometer.core.instrument.Tags @@ -39,7 +40,9 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EX import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND import org.onap.dcae.collectors.veshv.domain.RoutedMessage +import org.onap.dcae.collectors.veshv.domain.VesEventDomain import org.onap.dcae.collectors.veshv.domain.VesMessage +import org.onap.dcae.collectors.veshv.tests.utils.commonHeader import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge @@ -203,6 +206,25 @@ object MicrometerMetricsTest : Spek({ } } + on("$PREFIX.messages.processing.time.without.routing") { + val counterName = "$PREFIX.messages.processing.time.without.routing" + val processingTimeMs = 100L + + it("should update timer") { + + cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs))) + + registry.verifyTimer(counterName) { timer -> + assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble()) + } + verifyCountersAndTimersAreUnchangedBut( + counterName, + "$PREFIX.messages.sent.topic", + "$PREFIX.messages.sent", + "$PREFIX.messages.latency") + } + } + on("$PREFIX.messages.latency") { val counterName = "$PREFIX.messages.latency" val latencyMs = 1666L @@ -362,13 +384,19 @@ object MicrometerMetricsTest : Spek({ } }) -fun routedMessage(topic: String, partition: Int = 0) = +private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage { + val commonHeader = commonHeader(domain) + return VesMessage(commonHeader, + wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt)) +} + +private fun routedMessage(topic: String, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition) } -fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = +private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) = vesEvent().run { toRoutedMessage(topic, partition, receivedAt) } -fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = +private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) = vesEvent().run { val builder = toBuilder() builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000 diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt index ba60d1b0..3013e904 100644 --- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt +++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt @@ -30,6 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage import org.onap.ves.VesEventOuterClass import org.onap.ves.VesEventOuterClass.CommonEventHeader import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority +import java.time.Instant +import java.time.temporal.Temporal import java.util.UUID.randomUUID fun vesEvent(domain: VesEventDomain = PERF3GPP, @@ -53,7 +55,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP, vesEventListenerVersion: String = "7.0.2", priority: Priority = Priority.NORMAL, lastEpochMicrosec: Long = 100000005 - ): CommonEventHeader = +): CommonEventHeader = CommonEventHeader.newBuilder() .setVersion("sample-version") .setDomain(domain.domainName) @@ -86,14 +88,17 @@ fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMes payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue ) -fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage = +fun wireProtocolFrame(commonHeader: CommonEventHeader, + eventFields: ByteString = ByteString.EMPTY, + receivedAt: Temporal = Instant.now()): WireFrameMessage = vesEventBytes(commonHeader, eventFields).let { payload -> WireFrameMessage( payload = payload, versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR, versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR, payloadSize = payload.size(), - payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue + payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue, + receivedAt = receivedAt ) } diff --git a/tools/performance/cloud/cloud-based-performance-test.sh b/tools/performance/cloud/cloud-based-performance-test.sh index 6c27b69d..1b5f1f90 100755 --- a/tools/performance/cloud/cloud-based-performance-test.sh +++ b/tools/performance/cloud/cloud-based-performance-test.sh @@ -19,6 +19,7 @@ SCRIPT_DIRECTORY="$(pwd "$0")" CONTAINERS_COUNT=1 +COMPLETED_PRODUCERS_SUM=0 LOAD_TEST="false" TEST_CONFIG_MAP=performance-test-config PROPERTIES_FILE=${SCRIPT_DIRECTORY}/test.properties @@ -33,7 +34,7 @@ GRAFANA_DASHBOARD_PROVIDERS=grafana-dashboards-providers ONAP_NAMESPACE=onap MAXIMUM_BACK_OFF_CHECK_ITERATIONS=30 CHECK_NUMBER=0 -COMPLETED_PRODUCERS_SUM=0 +PRODUCERS_TO_RECREATE=0 NAME_REASON_PATTERN="custom-columns=NAME:.metadata.name,REASON:.status.containerStatuses[].state.waiting.reason" HVVES_POD_NAME=$(kubectl -n ${ONAP_NAMESPACE} get pods --no-headers=true -o custom-columns=:metadata.name | grep hv-ves-collector) HVVES_CERT_PATH=/etc/ves-hv/ssl/ @@ -114,6 +115,7 @@ function generate_certs() { } function handle_backoffs() { + IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull") if [[ ${IMAGE_PULL_BACK_OFFS} -gt 0 ]]; then CHECK_NUMBER=$((CHECK_NUMBER + 1)) if [[ ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]]; then @@ -126,7 +128,9 @@ function handle_backoffs() { function handle_key_interrupt() { trap SIGINT echo "Script interrupted, attempt to delete producers" - kubectl delete pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} + echo "Wait with patience" + COMPLETED_PRODUCERS_SUM=$(($(kubectl delete pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep producer | wc -l) + COMPLETED_PRODUCERS_SUM)) + echo "Total number of completed producers: ${COMPLETED_PRODUCERS_SUM}" exit 0 } @@ -229,21 +233,22 @@ function start_load_tests() { echo "Constant producer number keeper started working" while :; do - COMPLETED_PRODUCERS=$(($(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Completed")-COMPLETED_PRODUCERS_SUM)) - IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull") - + PRODUCERS_TO_RECREATE=$((CONTAINERS_COUNT-$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Running"))) handle_backoffs set -e - for i in $(seq 1 ${COMPLETED_PRODUCERS}); + for i in $(seq 1 ${PRODUCERS_TO_RECREATE}); do - echo "Recreating ${i}/${COMPLETED_PRODUCERS} producer" + echo "Recreating ${i}/${PRODUCERS_TO_RECREATE} producer" kubectl create -f producer-pod.yaml -n ${ONAP_NAMESPACE} - COMPLETED_PRODUCERS_SUM=$((COMPLETED_PRODUCERS_SUM + 1)) done set +e + COMPLETED_PRODUCERS_SUM=$((COMPLETED_PRODUCERS_SUM + PRODUCERS_TO_RECREATE)) + echo "Attempting to clear completed producers" + kubectl delete pod --field-selector=status.phase==Succeeded -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} + [[ ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]] && break - sleep 2 + sleep 1 done trap SIGINT @@ -258,8 +263,6 @@ function start_performance_test() { echo "Waiting for producers completion" while :; do COMPLETED_PRODUCERS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Completed") - IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull") - handle_backoffs [[ ${COMPLETED_PRODUCERS} -eq ${CONTAINERS_COUNT} || ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]] && break |