/* * ============LICENSE_START======================================================= * Copyright (C) 2024 Nordix Foundation * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * SPDX-License-Identifier: Apache-2.0 * ============LICENSE_END========================================================= */ import { crypto } from 'k6/experimental/webcrypto'; import { check } from 'k6'; import { Writer, SchemaRegistry, SCHEMA_TYPE_STRING } from 'k6/x/kafka'; const testEventPayload = JSON.stringify(JSON.parse(open('../../resources/sampleAvcInputEvent.json'))); const schemaRegistry = new SchemaRegistry(); const kafkaProducer = new Writer({ brokers: ['localhost:9092'], topic: 'dmi-cm-events', autoCreateTopic: true, batchSize: 5000, compression: 'gzip', requestTimeout: 30000 }); const TOTAL_MESSAGES = 100000; const VIRTUAL_USERS = 1000; export const options = { setupTimeout: '1m', teardownTimeout: '1m', scenarios: { produceKafkaMessages: { executor: 'shared-iterations', exec: 'sendKafkaMessages', vus: VIRTUAL_USERS, iterations: TOTAL_MESSAGES, maxDuration: '10m', } } }; function getCloudEventHeaders() { return { ce_type: 'org.onap.cps.ncmp.events.avc1_0_0.AvcEvent', ce_source: 'DMI', ce_destination: 'dmi-cm-events', ce_specversion: '1.0', ce_time: new Date().toISOString(), ce_id: crypto.randomUUID(), ce_dataschema: 'urn:cps:org.onap.cps.ncmp.events.avc1_0_0.AvcEvent:1.0.0', ce_correlationid: crypto.randomUUID() }; } export function sendKafkaMessages() { const cloudEventHeaders = getCloudEventHeaders(); const avcCloudEvent = { key: schemaRegistry.serialize({ data: cloudEventHeaders.ce_correlationid, schemaType: SCHEMA_TYPE_STRING, }), value: schemaRegistry.serialize({ data: testEventPayload, schemaType: SCHEMA_TYPE_STRING }), headers: cloudEventHeaders }; try { kafkaProducer.produce({ messages: [avcCloudEvent] }); const isMessageSent = check(kafkaProducer, { 'Message sent successfully': (producer) => producer != null, }); if (!isMessageSent) { console.error('Failed to send message:', avcCloudEvent); } } catch (error) { console.error('Error during message production:', error, avcCloudEvent); } } export function teardown() { kafkaProducer.close(); }