diff options
10 files changed, 278 insertions, 108 deletions
diff --git a/cps-application/src/main/resources/application.yml b/cps-application/src/main/resources/application.yml index 6f0807113d..83494d6545 100644 --- a/cps-application/src/main/resources/application.yml +++ b/cps-application/src/main/resources/application.yml @@ -160,6 +160,7 @@ cps: endpoint: ${ONAP_OTEL_EXPORTER_ENDPOINT:http://onap-otel-collector:4317} protocol: ${ONAP_OTEL_EXPORTER_PROTOCOL:grpc} enabled: ${ONAP_TRACING_ENABLED:false} + excluded-observation-names: ${ONAP_EXCLUDED_OBSERVATION_NAMES:tasks.scheduled.execution} # Actuator management: diff --git a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java index cff3187966..a6a82b7936 100644 --- a/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java +++ b/cps-ncmp-service/src/main/java/org/onap/cps/ncmp/config/OpenTelemetryConfig.java @@ -26,7 +26,11 @@ import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter; import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter; import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler; import io.opentelemetry.sdk.trace.samplers.Sampler; +import jakarta.annotation.PostConstruct; import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer; import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; @@ -37,11 +41,14 @@ import org.springframework.http.server.observation.ServerRequestObservationConte import org.springframework.util.AntPathMatcher; import org.springframework.util.PathMatcher; +/** + * Configuration class for setting up OpenTelemetry tracing in a Spring Boot application. + * This class provides beans for OTLP exporters (gRPC and HTTP), a Jaeger remote sampler, + * and customizes the ObservationRegistry to exclude certain endpoints from being observed. + */ @Configuration public class OpenTelemetryConfig { - public static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND = 30; - @Value("${spring.application.name:cps-application}") private String serviceId; @@ -51,9 +58,29 @@ public class OpenTelemetryConfig { @Value("${cps.tracing.sampler.jaeger_remote.endpoint:http://onap-otel-collector:14250}") private String jaegerRemoteSamplerUrl; + @Value("${cps.tracing.excluded-observation-names:tasks.scheduled.execution}") + private String excludedObservationNamesAsCsv; + + private static final int JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS = 30; + + private List<String> excludedObservationNames; + /** - * OTLP Exporter with Grpc exporter protocol. - */ + * Initializes the excludedObservationNames after the bean's properties have been set. + * This method is called by the Spring container during bean initialization. + */ + @PostConstruct + public void init() { + excludedObservationNames = Arrays.stream(excludedObservationNamesAsCsv.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } + + /** + * Creates an OTLP Exporter with gRPC protocol. + * + * @return OtlpGrpcSpanExporter bean if tracing is enabled and the exporter protocol is gRPC + */ @Bean @ConditionalOnExpression( "${cps.tracing.enabled} && 'grpc'.equals('${cps.tracing.exporter.protocol}')") @@ -62,7 +89,9 @@ public class OpenTelemetryConfig { } /** - * OTLP Exporter with HTTP exporter protocol. + * Creates an OTLP Exporter with HTTP protocol. + * + * @return OtlpHttpSpanExporter bean if tracing is enabled and the exporter protocol is HTTP */ @Bean @ConditionalOnExpression( @@ -72,39 +101,40 @@ public class OpenTelemetryConfig { } /** - * Jaeger Remote Sampler. + * Creates a Jaeger Remote Sampler. + * + * @return JaegerRemoteSampler bean if tracing is enabled */ @Bean @ConditionalOnProperty("cps.tracing.enabled") public JaegerRemoteSampler createJaegerRemoteSampler() { return JaegerRemoteSampler.builder() - .setEndpoint(jaegerRemoteSamplerUrl) - .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECOND)) - .setInitialSampler(Sampler.alwaysOff()) - .setServiceName(serviceId) - .build(); + .setEndpoint(jaegerRemoteSamplerUrl) + .setPollingInterval(Duration.ofSeconds(JAEGER_REMOTE_SAMPLER_POLLING_INTERVAL_IN_SECONDS)) + .setInitialSampler(Sampler.alwaysOff()) + .setServiceName(serviceId) + .build(); } /** - * Excluding /actuator/** endpoints. - */ + * Customizes the ObservationRegistry to exclude /actuator/** endpoints from being observed. + * + * @return ObservationRegistryCustomizer bean if tracing is enabled + */ @Bean @ConditionalOnProperty("cps.tracing.enabled") - ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() { + public ObservationRegistryCustomizer<ObservationRegistry> skipActuatorEndpointsFromObservation() { final PathMatcher pathMatcher = new AntPathMatcher("/"); return registry -> - registry.observationConfig().observationPredicate(observationPredicate(pathMatcher)); + registry.observationConfig().observationPredicate(observationPredicate(pathMatcher)); } - /** - * Excluding /actuator/** endpoints. - */ - static ObservationPredicate observationPredicate(final PathMatcher pathMatcher) { - return (name, context) -> { + private ObservationPredicate observationPredicate(final PathMatcher pathMatcher) { + return (observationName, context) -> { if (context instanceof ServerRequestObservationContext observationContext) { return !pathMatcher.match("/actuator/**", observationContext.getCarrier().getRequestURI()); } else { - return true; + return !excludedObservationNames.contains(observationName); } }; } diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy deleted file mode 100644 index 0f6906942f..0000000000 --- a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryCmNotificationSubscriptionConfigSpec.groovy +++ /dev/null @@ -1,81 +0,0 @@ -/* - * ============LICENSE_START======================================================= - * Copyright (C) 2024 Nordix Foundation - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - * ============LICENSE_END========================================================= - */ - -package org.onap.cps.ncmp.config - -import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter -import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter -import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler -import org.spockframework.spring.SpringBean -import org.springframework.boot.actuate.autoconfigure.observation.ObservationRegistryCustomizer -import spock.lang.Shared -import spock.lang.Specification - -class OpenTelemetryConfigSpec extends Specification{ - - @Shared - @SpringBean - OpenTelemetryConfig openTelemetryConfig = new OpenTelemetryConfig() - - def setupSpec() { - openTelemetryConfig.tracingExporterEndpointUrl="http://tracingExporterEndpointUrl" - openTelemetryConfig.jaegerRemoteSamplerUrl="http://jaegerremotesamplerurl" - openTelemetryConfig.serviceId ="cps-application" - } - - def 'OpenTelemetryConfig Construction.'() { - expect: 'the system can create an instance' - new OpenTelemetryConfig() != null - } - - def 'OTLP Exporter creation with Grpc protocol'(){ - when: 'an OTLP exporter is created' - def result = openTelemetryConfig.createOtlpExporterGrpc() - then: 'an OTLP Exporter is created' - assert result instanceof OtlpGrpcSpanExporter - } - - def 'OTLP Exporter creation with HTTP protocol'(){ - when: 'an OTLP exporter is created' - def result = openTelemetryConfig.createOtlpExporterHttp() - then: 'an OTLP Exporter is created' - assert result instanceof OtlpHttpSpanExporter - and: - assert result.builder.endpoint=="http://tracingExporterEndpointUrl" - } - - def 'Jaeger Remote Sampler Creation'(){ - when: 'an OTLP exporter is created' - def result = openTelemetryConfig.createJaegerRemoteSampler() - then: 'an OTLP Exporter is created' - assert result instanceof JaegerRemoteSampler - and: - assert result.delegate.type=="remoteSampling" - and: - assert result.delegate.url.toString().startsWith("http://jaegerremotesamplerurl") - } - - def 'Skipping Acutator endpoints'(){ - when: 'an OTLP exporter is created' - def result = openTelemetryConfig.skipActuatorEndpointsFromObservation() - then: 'an OTLP Exporter is created' - assert result instanceof ObservationRegistryCustomizer - } -} diff --git a/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy new file mode 100644 index 0000000000..cbff73113e --- /dev/null +++ b/cps-ncmp-service/src/test/groovy/org/onap/cps/ncmp/config/OpenTelemetryConfigSpec.groovy @@ -0,0 +1,113 @@ +/* + * ============LICENSE_START======================================================= + * Copyright (C) 2024 Nordix Foundation + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * ============LICENSE_END========================================================= + */ + +package org.onap.cps.ncmp.config + +import io.micrometer.observation.ObservationPredicate +import io.micrometer.observation.ObservationRegistry +import io.micrometer.observation.ObservationRegistry.ObservationConfig +import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter +import io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporter +import io.opentelemetry.sdk.extension.trace.jaeger.sampler.JaegerRemoteSampler +import org.springframework.beans.factory.annotation.Value +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.server.observation.ServerRequestObservationContext +import org.springframework.mock.web.MockHttpServletRequest +import org.springframework.util.AntPathMatcher +import spock.lang.Specification + +@SpringBootTest(classes = [OpenTelemetryConfig]) +class OpenTelemetryConfigSpec extends Specification { + + def objectUnderTest + + @Value('${cps.tracing.exporter.endpoint}') + def tracingExporterEndpointUrl + + @Value('${cps.tracing.sampler.jaeger_remote.endpoint}') + def jaegerRemoteSamplerUrl + + def setup() { + objectUnderTest = new OpenTelemetryConfig( + serviceId: 'sample-app', + tracingExporterEndpointUrl: tracingExporterEndpointUrl, + jaegerRemoteSamplerUrl: jaegerRemoteSamplerUrl, + excludedObservationNames: ['excluded-task-name']) + } + + def 'OTLP exporter creation with Grpc protocol'() { + when: 'an OTLP exporter is created' + def result = objectUnderTest.createOtlpExporterGrpc() + then: 'expected an instance of OtlpGrpcSpanExporter' + assert result instanceof OtlpGrpcSpanExporter + } + + def 'OTLP exporter creation with HTTP protocol'() { + when: 'an OTLP exporter is created' + def result = objectUnderTest.createOtlpExporterHttp() + then: 'an OTLP Exporter is created' + assert result instanceof OtlpHttpSpanExporter + and: 'the endpoint is correctly set' + assert result.builder.endpoint == 'http://exporter-test-url' + } + + def 'Jaeger Remote Sampler Creation'() { + when: 'a Jaeger remote sampler is created' + def result = objectUnderTest.createJaegerRemoteSampler() + then: 'a Jaeger remote sampler is created' + assert result instanceof JaegerRemoteSampler + and: 'the sampler type is correct' + assert result.delegate.type == 'remoteSampling' + and: 'the sampler endpoint is correctly set' + assert result.delegate.url.toString().startsWith('http://jaeger-remote-test-url') + } + + def 'Skipping actuator endpoints'() { + given: 'a mocked observation registry and config' + def observationRegistry = Mock(ObservationRegistry.class) + def observationConfig = Mock(ObservationConfig.class) + observationRegistry.observationConfig() >> observationConfig + when: 'an observation registry customizer is created and applied' + def result = objectUnderTest.skipActuatorEndpointsFromObservation() + result.customize(observationRegistry) + then: 'the observation predicate is set correctly' + 1 * observationConfig.observationPredicate(_) >> { ObservationPredicate observationPredicate -> + def mockedHttpServletRequest = new MockHttpServletRequest(_ as String, requestUrl) + def serverRequestObservationContext = new ServerRequestObservationContext(mockedHttpServletRequest, null) + and: 'expected predicate for endpoint' + assert observationPredicate.test('some-name', serverRequestObservationContext) == expectedPredicate + } + where: 'the following parameters are used' + scenario | requestUrl || expectedPredicate + 'an actuator' | '/actuator' || false + 'a non actuator' | '/some-api' || true + } + + def 'Observation predicate is configured to filter out excluded tasks by name'() { + when: 'a path matcher and observation predicate' + def observationPredicate = objectUnderTest.observationPredicate(new AntPathMatcher('/')) + then: 'a task name is provided' + assert observationPredicate.test(taskName, null) == expectedPredicate + where: 'the following parameters are used' + taskName || expectedPredicate + 'excluded-task-name' || false + 'non-excluded-task-name' || true + } +} diff --git a/cps-ncmp-service/src/test/resources/application.yml b/cps-ncmp-service/src/test/resources/application.yml index f0790dda4b..759de834ab 100644 --- a/cps-ncmp-service/src/test/resources/application.yml +++ b/cps-ncmp-service/src/test/resources/application.yml @@ -16,6 +16,15 @@ # SPDX-License-Identifier: Apache-2.0 # ============LICENSE_END========================================================= +cps: + tracing: + sampler: + jaeger_remote: + endpoint: http://jaeger-Remote-test-url + exporter: + endpoint: http://exporter-test-url + enabled: true + spring: kafka: producer: diff --git a/docker-compose/docker-compose.yml b/docker-compose/docker-compose.yml index 86afe78926..5af325a50b 100644 --- a/docker-compose/docker-compose.yml +++ b/docker-compose/docker-compose.yml @@ -20,6 +20,7 @@ services: ### docker-compose --profile dmi-service up -d -> run CPS services incl. dmi-plugin ### ### docker-compose --profile dmi-stub --profile monitoring up -d -> run CPS with stubbed dmi-plugin (for registration performance testing) + ### docker-compose --profile dmi-stub --profile tracing up -d -> run CPS with stubbed dmi-plugin (for open telemetry tracing testing make ONAP_TRACING_ENABLED "true" later "http://localhost:16686" can be accessed from browser) ### to disable notifications make notification.enabled to false & comment out kafka/zookeeper services ### dbpostgresql: @@ -54,6 +55,9 @@ services: DMI_PASSWORD: ${DMI_PASSWORD:-cpsr0cks!} KAFKA_BOOTSTRAP_SERVER: kafka:29092 notification.enabled: 'true' + ONAP_TRACING_ENABLED: 'false' + ONAP_OTEL_SAMPLER_JAEGER_REMOTE_ENDPOINT: http://jaeger-service:14250 + ONAP_OTEL_EXPORTER_ENDPOINT: http://jaeger-service:4317 restart: unless-stopped depends_on: - dbpostgresql @@ -181,5 +185,26 @@ services: profiles: - monitoring + kafka-ui: + container_name: kafka-ui + image: provectuslabs/kafka-ui:latest + ports: + - 8089:8080 + environment: + DYNAMIC_CONFIG_ENABLED: 'true' + KAFKA_CLUSTERS_0_NAME: 'cps-kafka-local' + KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 + profiles: + - monitoring + + jaeger-service: + container_name: jaeger-service + image: jaegertracing/all-in-one:latest + ports: + - 16686:16686 + restart: unless-stopped + profiles: + - tracing + volumes: grafana: diff --git a/k6-tests/README.md b/k6-tests/README.md index 0fdebcfe9d..9a385e100a 100644 --- a/k6-tests/README.md +++ b/k6-tests/README.md @@ -4,8 +4,7 @@ k6 tests are written in JavaScript. ## k6 installation -Follow the instructions in the [k6 installation guide](https://grafana.com/docs/k6/latest/set-up/install-k6/) -to get started. +Follow the instructions in the [build from source guide](https://github.com/mostafa/xk6-kafka) to get started. ## Running the k6 test suites Simply run the main script. (The script assumes k6 and docker-compose have been installed). diff --git a/k6-tests/ncmp/common/passthrough-crud.js b/k6-tests/ncmp/common/passthrough-crud.js index 43a215fdf8..76bda4e1bd 100644 --- a/k6-tests/ncmp/common/passthrough-crud.js +++ b/k6-tests/ncmp/common/passthrough-crud.js @@ -19,7 +19,12 @@ */ import http from 'k6/http'; -import { NCMP_BASE_URL, CONTENT_TYPE_JSON_PARAM, getRandomCmHandleId } from './utils.js'; +import { + CONTENT_TYPE_JSON_PARAM, + getRandomCmHandleId, + NCMP_BASE_URL, + TOPIC_DATA_OPERATIONS_BATCH_READ +} from './utils.js'; export function passthroughRead() { const cmHandleId = getRandomCmHandleId(); @@ -40,3 +45,21 @@ export function passthroughWrite() { const response = http.post(url, JSON.stringify(body), CONTENT_TYPE_JSON_PARAM); return response; } + +export function batchRead(cmHandleIds) { + const url = `${NCMP_BASE_URL}/ncmp/v1/data?topic=${TOPIC_DATA_OPERATIONS_BATCH_READ}` + const payload = { + "operations": [ + { + "resourceIdentifier": "parent/child", + "targetIds": cmHandleIds, + "datastore": "ncmp-datastore:passthrough-operational", + "options": "(fields=schemas/schema)", + "operationId": "12", + "operation": "read" + } + ] + }; + const response = http.post(url, JSON.stringify(payload), CONTENT_TYPE_JSON_PARAM); + return response; +}
\ No newline at end of file diff --git a/k6-tests/ncmp/common/utils.js b/k6-tests/ncmp/common/utils.js index 0f3b8d9c96..f24edc50d6 100644 --- a/k6-tests/ncmp/common/utils.js +++ b/k6-tests/ncmp/common/utils.js @@ -25,6 +25,9 @@ export const REGISTRATION_BATCH_SIZE = 100; export const READ_DATA_FOR_CM_HANDLE_DELAY_MS = 300; // must have same value as in docker-compose.yml export const WRITE_DATA_FOR_CM_HANDLE_DELAY_MS = 670; // must have same value as in docker-compose.yml export const CONTENT_TYPE_JSON_PARAM = { headers: {'Content-Type': 'application/json'} }; +export const DATA_OPERATION_READ_BATCH_SIZE = 200; +export const TOPIC_DATA_OPERATIONS_BATCH_READ = 'topic-data-operations-batch-read'; +export const KAFKA_BOOTSTRAP_SERVERS = ['localhost:9092']; export function recordTimeInSeconds(functionToExecute) { const startTimeInMillis = Date.now(); @@ -65,6 +68,7 @@ export function makeCustomSummaryReport(data, options) { makeSummaryCsvLine('5b', 'NCMP overhead for Synchronous single CM-handle pass-through read', 'milliseconds', 'ncmp_overhead_passthrough_read', data, options), makeSummaryCsvLine('6a', 'Synchronous single CM-handle pass-through write', 'requests/second', 'http_reqs{scenario:passthrough_write}', data, options), makeSummaryCsvLine('6b', 'NCMP overhead for Synchronous single CM-handle pass-through write', 'milliseconds', 'ncmp_overhead_passthrough_write', data, options), + makeSummaryCsvLine('7', 'Data operations batch read', 'events/second', 'data_operations_batch_read_cmhandles_per_second', data, options), ]; return summaryCsvLines.join('\n') + '\n'; } diff --git a/k6-tests/ncmp/ncmp-kpi.js b/k6-tests/ncmp/ncmp-kpi.js index b4c476ecb2..8ff9ec50b4 100644 --- a/k6-tests/ncmp/ncmp-kpi.js +++ b/k6-tests/ncmp/ncmp-kpi.js @@ -20,16 +20,28 @@ import { check } from 'k6'; import { Gauge, Trend } from 'k6/metrics'; -import { TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS, - makeCustomSummaryReport, recordTimeInSeconds } from './common/utils.js'; +import { + TOTAL_CM_HANDLES, READ_DATA_FOR_CM_HANDLE_DELAY_MS, WRITE_DATA_FOR_CM_HANDLE_DELAY_MS, + makeCustomSummaryReport, recordTimeInSeconds, makeBatchOfCmHandleIds, DATA_OPERATION_READ_BATCH_SIZE, + TOPIC_DATA_OPERATIONS_BATCH_READ, KAFKA_BOOTSTRAP_SERVERS +} from './common/utils.js'; import { registerAllCmHandles, deregisterAllCmHandles } from './common/cmhandle-crud.js'; import { executeCmHandleSearch, executeCmHandleIdSearch } from './common/search-base.js'; -import { passthroughRead, passthroughWrite } from './common/passthrough-crud.js'; +import { passthroughRead, passthroughWrite, batchRead } from './common/passthrough-crud.js'; +import { + Reader, +} from 'k6/x/kafka'; let cmHandlesCreatedPerSecondGauge = new Gauge('cmhandles_created_per_second'); let cmHandlesDeletedPerSecondGauge = new Gauge('cmhandles_deleted_per_second'); let passthroughReadNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_read'); let passthroughWriteNcmpOverheadTrend = new Trend('ncmp_overhead_passthrough_write'); +let dataOperationsBatchReadCmHandlePerSecondTrend = new Trend('data_operations_batch_read_cmhandles_per_second'); + +const reader = new Reader({ + brokers: KAFKA_BOOTSTRAP_SERVERS, + topic: TOPIC_DATA_OPERATIONS_BATCH_READ, +}); const DURATION = '15m'; @@ -61,6 +73,22 @@ export const options = { vus: 3, duration: DURATION, }, + data_operation_send_async_http_request: { + executor: 'constant-arrival-rate', + exec: 'data_operation_send_async_http_request', + duration: DURATION, + rate: 1, + timeUnit: '1s', + preAllocatedVUs: 1, + }, + data_operation_async_batch_read: { + executor: 'constant-arrival-rate', + exec: 'data_operation_async_batch_read', + duration: DURATION, + rate: 1, + timeUnit: '1s', + preAllocatedVUs: 1, + } }, thresholds: { 'cmhandles_created_per_second': ['value >= 22'], @@ -75,6 +103,9 @@ export const options = { 'http_req_failed{scenario:cm_search_module}': ['rate == 0'], 'http_req_failed{scenario:passthrough_read}': ['rate == 0'], 'http_req_failed{scenario:passthrough_write}': ['rate == 0'], + 'http_req_failed{scenario:data_operation_send_async_http_request}': ['rate == 0'], + 'kafka_reader_error_count{scenario:data_operation_consume_kafka_responses}': ['count == 0'], + 'data_operations_batch_read_cmhandles_per_second': ['avg >= 150'], }, }; @@ -114,6 +145,22 @@ export function cm_search_module() { check(JSON.parse(response.body), { 'module search returned expected CM-handles': (arr) => arr.length === TOTAL_CM_HANDLES }); } +export function data_operation_send_async_http_request() { + const nextBatchOfCmHandleIds = makeBatchOfCmHandleIds(DATA_OPERATION_READ_BATCH_SIZE,1); + const response = batchRead(nextBatchOfCmHandleIds) + check(response, { 'data operation batch read status equals 200': (r) => r.status === 200 }); +} + +export function data_operation_async_batch_read() { + try { + let messages = reader.consume({ limit: DATA_OPERATION_READ_BATCH_SIZE }); + dataOperationsBatchReadCmHandlePerSecondTrend.add(messages.length); + } catch (error) { + dataOperationsBatchReadCmHandlePerSecondTrend.add(0); + console.error(error); + } +} + export function handleSummary(data) { return { stdout: makeCustomSummaryReport(data, options), |