test_bytes.js 2.43 KB
/*

This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 byte array messages per iteration.

*/

import { check } from "k6";
import {
  Writer,
  Reader,
  Connection,
  SchemaRegistry,
  SCHEMA_TYPE_BYTES,
} from "k6/x/kafka"; // import kafka extension

const brokers = ["192.168.1.10:9092"];
const topic = "xk6_kafka_byte_array_topic";

const writer = new Writer({
  brokers: brokers,
  topic: topic,
  autoCreateTopic: true,
});
const reader = new Reader({
  brokers: brokers,
  topic: topic,
});
const connection = new Connection({
  address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();

if (__VU == 0) {
  connection.createTopic({ topic: topic });
}

const payload = "byte array payload";

export default function () {
  for (let index = 0; index < 100; index++) {
    let messages = [
      {
        // The data type of the key is a string
        key: schemaRegistry.serialize({
          data: Array.from("test-id-abc-" + index, (x) => x.charCodeAt(0)),
          schemaType: SCHEMA_TYPE_BYTES,
        }),
        // The data type of the value is a byte array
        value: schemaRegistry.serialize({
          data: Array.from(payload, (x) => x.charCodeAt(0)),
          schemaType: SCHEMA_TYPE_BYTES,
        }),
      },
      {
        key: schemaRegistry.serialize({
          data: Array.from("test-id-def-" + index, (x) => x.charCodeAt(0)),
          schemaType: SCHEMA_TYPE_BYTES,
        }),
        value: schemaRegistry.serialize({
          data: Array.from(payload, (x) => x.charCodeAt(0)),
          schemaType: SCHEMA_TYPE_BYTES,
        }),
      },
    ];

    writer.produce({
      messages: messages,
    });
  }

  // Read 10 messages only
  let messages = reader.consume({ limit: 10 });
  check(messages, {
    "10 messages returned": (msgs) => msgs.length == 10,
    "key starts with 'test-id-' string": (msgs) =>
      String.fromCharCode(
        ...schemaRegistry.deserialize({
          data: msgs[0].key,
          schemaType: SCHEMA_TYPE_BYTES,
        }),
      ).startsWith("test-id-"),
    "value is correct": (msgs) =>
      String.fromCharCode(
        ...schemaRegistry.deserialize({
          data: msgs[0].value,
          schemaType: SCHEMA_TYPE_BYTES,
        }),
      ) == payload,
  });
}

// export function teardown(data) {
//   if (__VU == 0) {
//     // Delete the topic
//     connection.deleteTopic(topic);
//   }
//   writer.close();
//   reader.close();
//   connection.close();
// }