aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt2
-rw-r--r--sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt1
-rw-r--r--sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt6
-rw-r--r--sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt13
-rw-r--r--sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt34
-rw-r--r--sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt11
-rwxr-xr-xtools/performance/cloud/cloud-based-performance-test.sh25
7 files changed, 72 insertions, 20 deletions
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
index 28b28203..41993e62 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/boundary/adapters.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.boundary
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.domain.logging.ClientContext
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
@@ -42,6 +43,7 @@ interface SinkFactory : Closeable {
interface Metrics {
fun notifyBytesReceived(size: Int)
fun notifyMessageReceived(msg: WireFrameMessage)
+ fun notifyMessageReadyForRouting(msg: VesMessage)
fun notifyMessageSent(msg: RoutedMessage)
fun notifyMessageDropped(cause: MessageDropCause)
fun notifyClientDisconnected()
diff --git a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
index ac7c3917..f0d1465b 100644
--- a/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
+++ b/sources/hv-collector-core/src/main/kotlin/org/onap/dcae/collectors/veshv/impl/HvVesCollector.kt
@@ -92,6 +92,7 @@ internal class HvVesCollector(
}
private fun route(flux: Flux<VesMessage>) = flux
+ .doOnNext(metrics::notifyMessageReadyForRouting)
.flatMap(router::route)
.doOnNext(this::updateSinkMetrics)
diff --git a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
index a450b794..3b01d137 100644
--- a/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
+++ b/sources/hv-collector-ct/src/test/kotlin/org/onap/dcae/collectors/veshv/tests/fakes/metrics.kt
@@ -24,6 +24,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import java.time.Duration
import java.time.Instant
import kotlin.test.fail
@@ -38,6 +39,7 @@ class FakeMetrics : Metrics {
var messageBytesReceived: Int = 0; private set
var messagesDroppedCount: Int = 0; private set
var lastProcessingTimeMicros: Double = -1.0; private set
+ var lastProcessingTimeWithoutRoutingMicros: Double = -1.0; private set
var messagesSentCount: Int = 0; private set
var clientRejectionCause = mutableMapOf<ClientRejectionCause, Int>(); private set
@@ -52,6 +54,10 @@ class FakeMetrics : Metrics {
messageBytesReceived += msg.payloadSize
}
+ override fun notifyMessageReadyForRouting(msg: VesMessage) {
+ lastProcessingTimeWithoutRoutingMicros = Duration.between(msg.wtpFrame.receivedAt, Instant.now()).toNanos() / 1000.0
+ }
+
override fun notifyMessageSent(msg: RoutedMessage) {
messagesSentCount++
messagesSentToTopic.compute(msg.targetTopic) { k, _ ->
diff --git a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
index fa52ac2c..9d417a28 100644
--- a/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
+++ b/sources/hv-collector-main/src/main/kotlin/org/onap/dcae/collectors/veshv/main/metrics/MicrometerMetrics.kt
@@ -34,6 +34,7 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.dcae.collectors.veshv.model.ClientRejectionCause
import org.onap.dcae.collectors.veshv.model.MessageDropCause
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesMessage
import org.onap.dcae.collectors.veshv.utils.TimeUtils.epochMicroToInstant
import java.time.Duration
import java.time.Instant
@@ -46,7 +47,6 @@ import java.time.Instant
class MicrometerMetrics internal constructor(
private val registry: PrometheusMeterRegistry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
) : Metrics {
-
private val receivedBytes = registry.counter(name(DATA, RECEIVED, BYTES))
private val receivedMessages = registry.counter(name(MESSAGES, RECEIVED))
private val receivedMessagesPayloadBytes = registry.counter(name(MESSAGES, RECEIVED, PAYLOAD, BYTES))
@@ -58,6 +58,9 @@ class MicrometerMetrics internal constructor(
.maximumExpectedValue(MAX_BUCKET_DURATION)
.publishPercentileHistogram(true)
.register(registry)
+ private val processingTimeWithoutRouting = Timer.builder(name(MESSAGES, PROCESSING, TIME, WITHOUT, ROUTING))
+ .publishPercentileHistogram(true)
+ .register(registry)
private val totalLatency = Timer.builder(name(MESSAGES, LATENCY))
.maximumExpectedValue(MAX_BUCKET_DURATION)
.publishPercentileHistogram(true)
@@ -67,12 +70,10 @@ class MicrometerMetrics internal constructor(
private val sentMessagesByTopic = { topic: String ->
registry.counter(name(MESSAGES, SENT, TOPIC), TOPIC, topic)
}.memoize<String, Counter>()
-
private val droppedMessages = registry.counter(name(MESSAGES, DROPPED))
private val messagesDroppedByCause = { cause: String ->
registry.counter(name(MESSAGES, DROPPED, CAUSE), CAUSE, cause)
}.memoize<String, Counter>()
-
private val clientsRejected = registry.counter(name(CLIENTS, REJECTED))
private val clientsRejectedByCause = { cause: String ->
registry.counter(name(CLIENTS, REJECTED, CAUSE), CAUSE, cause)
@@ -97,6 +98,10 @@ class MicrometerMetrics internal constructor(
receivedBytes.increment(size.toDouble())
}
+ override fun notifyMessageReadyForRouting(msg: VesMessage) {
+ processingTimeWithoutRouting.record(Duration.between(msg.wtpFrame.receivedAt, Instant.now()))
+ }
+
override fun notifyMessageReceived(msg: WireFrameMessage) {
receivedMessages.increment()
receivedMessagesPayloadBytes.increment(msg.payloadSize.toDouble())
@@ -150,6 +155,8 @@ class MicrometerMetrics internal constructor(
internal const val LATENCY = "latency"
internal const val PAYLOAD = "payload"
internal val MAX_BUCKET_DURATION = Duration.ofSeconds(300L)
+ internal const val WITHOUT = "without"
+ internal const val ROUTING = "routing"
internal fun name(vararg name: String) = "$PREFIX.${name.joinToString(".")}"
}
}
diff --git a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
index 66f3a5fc..a3471d46 100644
--- a/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
+++ b/sources/hv-collector-main/src/test/kotlin/org/onap/dcae/collectors/veshv/main/MicrometerMetricsTest.kt
@@ -20,6 +20,7 @@
package org.onap.dcae.collectors.veshv.main
import arrow.core.Option
+import com.google.protobuf.ByteString
import io.micrometer.core.instrument.Counter
import io.micrometer.core.instrument.Meter
import io.micrometer.core.instrument.Tags
@@ -39,7 +40,9 @@ import org.onap.dcae.collectors.veshv.model.ClientRejectionCause.PAYLOAD_SIZE_EX
import org.onap.dcae.collectors.veshv.model.MessageDropCause.INVALID_MESSAGE
import org.onap.dcae.collectors.veshv.model.MessageDropCause.ROUTE_NOT_FOUND
import org.onap.dcae.collectors.veshv.domain.RoutedMessage
+import org.onap.dcae.collectors.veshv.domain.VesEventDomain
import org.onap.dcae.collectors.veshv.domain.VesMessage
+import org.onap.dcae.collectors.veshv.tests.utils.commonHeader
import org.onap.dcae.collectors.veshv.tests.utils.emptyWireProtocolFrame
import org.onap.dcae.collectors.veshv.tests.utils.verifyCounter
import org.onap.dcae.collectors.veshv.tests.utils.verifyGauge
@@ -203,6 +206,25 @@ object MicrometerMetricsTest : Spek({
}
}
+ on("$PREFIX.messages.processing.time.without.routing") {
+ val counterName = "$PREFIX.messages.processing.time.without.routing"
+ val processingTimeMs = 100L
+
+ it("should update timer") {
+
+ cut.notifyMessageReadyForRouting(vesMessageReceivedAt(Instant.now().minusMillis(processingTimeMs)))
+
+ registry.verifyTimer(counterName) { timer ->
+ assertThat(timer.mean(TimeUnit.MILLISECONDS)).isGreaterThanOrEqualTo(processingTimeMs.toDouble())
+ }
+ verifyCountersAndTimersAreUnchangedBut(
+ counterName,
+ "$PREFIX.messages.sent.topic",
+ "$PREFIX.messages.sent",
+ "$PREFIX.messages.latency")
+ }
+ }
+
on("$PREFIX.messages.latency") {
val counterName = "$PREFIX.messages.latency"
val latencyMs = 1666L
@@ -362,13 +384,19 @@ object MicrometerMetricsTest : Spek({
}
})
-fun routedMessage(topic: String, partition: Int = 0) =
+private fun vesMessageReceivedAt(receivedAt: Temporal, domain: VesEventDomain = VesEventDomain.PERF3GPP): VesMessage {
+ val commonHeader = commonHeader(domain)
+ return VesMessage(commonHeader,
+ wireProtocolFrame(commonHeader, ByteString.copyFromUtf8("highvolume measurements"), receivedAt))
+}
+
+private fun routedMessage(topic: String, partition: Int = 0) =
vesEvent().run { toRoutedMessage(topic, partition) }
-fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
+private fun routedMessageReceivedAt(topic: String, receivedAt: Temporal, partition: Int = 0) =
vesEvent().run { toRoutedMessage(topic, partition, receivedAt) }
-fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
+private fun routedMessageSentAt(topic: String, sentAt: Instant, partition: Int = 0) =
vesEvent().run {
val builder = toBuilder()
builder.commonEventHeaderBuilder.lastEpochMicrosec = sentAt.epochSecond * 1000000 + sentAt.nano / 1000
diff --git a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
index ba60d1b0..3013e904 100644
--- a/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
+++ b/sources/hv-collector-test-utils/src/main/kotlin/org/onap/dcae/collectors/veshv/tests/utils/vesEvents.kt
@@ -30,6 +30,8 @@ import org.onap.dcae.collectors.veshv.domain.WireFrameMessage
import org.onap.ves.VesEventOuterClass
import org.onap.ves.VesEventOuterClass.CommonEventHeader
import org.onap.ves.VesEventOuterClass.CommonEventHeader.Priority
+import java.time.Instant
+import java.time.temporal.Temporal
import java.util.UUID.randomUUID
fun vesEvent(domain: VesEventDomain = PERF3GPP,
@@ -53,7 +55,7 @@ fun commonHeader(domain: VesEventDomain = PERF3GPP,
vesEventListenerVersion: String = "7.0.2",
priority: Priority = Priority.NORMAL,
lastEpochMicrosec: Long = 100000005
- ): CommonEventHeader =
+): CommonEventHeader =
CommonEventHeader.newBuilder()
.setVersion("sample-version")
.setDomain(domain.domainName)
@@ -86,14 +88,17 @@ fun wireProtocolFrameWithPayloadSize(size: Int): WireFrameMessage = WireFrameMes
payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
)
-fun wireProtocolFrame(commonHeader: CommonEventHeader, eventFields: ByteString = ByteString.EMPTY): WireFrameMessage =
+fun wireProtocolFrame(commonHeader: CommonEventHeader,
+ eventFields: ByteString = ByteString.EMPTY,
+ receivedAt: Temporal = Instant.now()): WireFrameMessage =
vesEventBytes(commonHeader, eventFields).let { payload ->
WireFrameMessage(
payload = payload,
versionMajor = WireFrameMessage.SUPPORTED_VERSION_MAJOR,
versionMinor = WireFrameMessage.SUPPORTED_VERSION_MINOR,
payloadSize = payload.size(),
- payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue
+ payloadType = PayloadContentType.GOOGLE_PROTOCOL_BUFFER.hexValue,
+ receivedAt = receivedAt
)
}
diff --git a/tools/performance/cloud/cloud-based-performance-test.sh b/tools/performance/cloud/cloud-based-performance-test.sh
index 6c27b69d..1b5f1f90 100755
--- a/tools/performance/cloud/cloud-based-performance-test.sh
+++ b/tools/performance/cloud/cloud-based-performance-test.sh
@@ -19,6 +19,7 @@
SCRIPT_DIRECTORY="$(pwd "$0")"
CONTAINERS_COUNT=1
+COMPLETED_PRODUCERS_SUM=0
LOAD_TEST="false"
TEST_CONFIG_MAP=performance-test-config
PROPERTIES_FILE=${SCRIPT_DIRECTORY}/test.properties
@@ -33,7 +34,7 @@ GRAFANA_DASHBOARD_PROVIDERS=grafana-dashboards-providers
ONAP_NAMESPACE=onap
MAXIMUM_BACK_OFF_CHECK_ITERATIONS=30
CHECK_NUMBER=0
-COMPLETED_PRODUCERS_SUM=0
+PRODUCERS_TO_RECREATE=0
NAME_REASON_PATTERN="custom-columns=NAME:.metadata.name,REASON:.status.containerStatuses[].state.waiting.reason"
HVVES_POD_NAME=$(kubectl -n ${ONAP_NAMESPACE} get pods --no-headers=true -o custom-columns=:metadata.name | grep hv-ves-collector)
HVVES_CERT_PATH=/etc/ves-hv/ssl/
@@ -114,6 +115,7 @@ function generate_certs() {
}
function handle_backoffs() {
+ IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull")
if [[ ${IMAGE_PULL_BACK_OFFS} -gt 0 ]]; then
CHECK_NUMBER=$((CHECK_NUMBER + 1))
if [[ ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]]; then
@@ -126,7 +128,9 @@ function handle_backoffs() {
function handle_key_interrupt() {
trap SIGINT
echo "Script interrupted, attempt to delete producers"
- kubectl delete pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE}
+ echo "Wait with patience"
+ COMPLETED_PRODUCERS_SUM=$(($(kubectl delete pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep producer | wc -l) + COMPLETED_PRODUCERS_SUM))
+ echo "Total number of completed producers: ${COMPLETED_PRODUCERS_SUM}"
exit 0
}
@@ -229,21 +233,22 @@ function start_load_tests() {
echo "Constant producer number keeper started working"
while :; do
- COMPLETED_PRODUCERS=$(($(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Completed")-COMPLETED_PRODUCERS_SUM))
- IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull")
-
+ PRODUCERS_TO_RECREATE=$((CONTAINERS_COUNT-$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Running")))
handle_backoffs
set -e
- for i in $(seq 1 ${COMPLETED_PRODUCERS});
+ for i in $(seq 1 ${PRODUCERS_TO_RECREATE});
do
- echo "Recreating ${i}/${COMPLETED_PRODUCERS} producer"
+ echo "Recreating ${i}/${PRODUCERS_TO_RECREATE} producer"
kubectl create -f producer-pod.yaml -n ${ONAP_NAMESPACE}
- COMPLETED_PRODUCERS_SUM=$((COMPLETED_PRODUCERS_SUM + 1))
done
set +e
+ COMPLETED_PRODUCERS_SUM=$((COMPLETED_PRODUCERS_SUM + PRODUCERS_TO_RECREATE))
+ echo "Attempting to clear completed producers"
+ kubectl delete pod --field-selector=status.phase==Succeeded -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE}
+
[[ ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]] && break
- sleep 2
+ sleep 1
done
trap SIGINT
@@ -258,8 +263,6 @@ function start_performance_test() {
echo "Waiting for producers completion"
while :; do
COMPLETED_PRODUCERS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} | grep -c "Completed")
- IMAGE_PULL_BACK_OFFS=$(kubectl get pods -l app=${PRODUCER_APPS_LABEL} -n ${ONAP_NAMESPACE} -o ${NAME_REASON_PATTERN} | grep -c "ImagePullBackOff \| ErrImagePull")
-
handle_backoffs
[[ ${COMPLETED_PRODUCERS} -eq ${CONTAINERS_COUNT} || ${CHECK_NUMBER} -gt ${MAXIMUM_BACK_OFF_CHECK_ITERATIONS} ]] && break