Commit 4bac745b authored by Your Name's avatar Your Name

add kafka test

parent 99233b73
## 测试命令
value大小 影响很大;
./k6 run --vus 50 --duration 60s test_bytes.js
## 链接
* https://k6.io/docs/results-output/real-time/apache-kafka/
## 测试记录
```console
wuban@node-220:/data/benchmark/kafka$ ./k6 run --vus 250 --duration 60s produce_consume.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: produce_consume.js
output: -
scenarios: (100.00%) 1 scenario, 250 max VUs, 1m30s max duration (incl. graceful stop):
* default: 250 looping VUs for 1m0s (gracefulStop: 30s)
█ teardown
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=6.14ms min=47.82µs med=77.12µs max=11.82s p(90)=128.16µs p(95)=166.31µs
iterations.........................: 2686725 37616.851143/s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 2686725
kafka_writer_attempts_max..........: 10 min=10 max=10
kafka_writer_batch_bytes...........: 422 MB 5.9 MB/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=40.49µs min=0s med=7.85µs max=67.36ms p(90)=46.11µs p(95)=62.19µs
kafka_writer_batch_seconds.........: avg=8.18µs min=2.58µs med=5.39µs max=67.35ms p(90)=10.5µs p(95)=13.33µs
kafka_writer_batch_size............: 2686725 37616.851143/s
kafka_writer_batch_timeout.........: 277h46m40s min=277h46m40s max=277h46m40s
kafka_writer_error_count...........: 1464 20.497472/s
kafka_writer_message_bytes.........: 422 MB 5.9 MB/s
kafka_writer_message_count.........: 2688189 37637.348615/s
kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
kafka_writer_retries_count.........: 1464 20.497472/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 2688189 37637.348615/s
kafka_writer_write_seconds.........: avg=3.03ms min=10.95µs med=17.25µs max=9.93s p(90)=27.8µs p(95)=33.56µs
kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
vus................................: 0 min=0 max=250
vus_max............................: 250 min=83 max=250
running (1m11.4s), 000/250 VUs, 2686725 complete and 0 interrupted iterations
default ✓ [======================================] 250 VUs 1m0s
```
```console
wuban@node-220:/data/benchmark/kafka$ ./k6 run --vus 250 --duration 60s produce_consume.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: produce_consume.js
output: -
scenarios: (100.00%) 1 scenario, 250 max VUs, 1m30s max duration (incl. graceful stop):
* default: 250 looping VUs for 1m0s (gracefulStop: 30s)
█ teardown
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=3.58ms min=44.33µs med=74.06µs max=11.29s p(90)=123.38µs p(95)=170.3µs
iterations.........................: 4553721 64097.094703/s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 4553721
kafka_writer_attempts_max..........: 10 min=10 max=10
kafka_writer_batch_bytes...........: 169 MB 2.4 MB/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=47.47µs min=0s med=5.85µs max=75.38ms p(90)=44.5µs p(95)=66.71µs
kafka_writer_batch_seconds.........: avg=9.68µs min=2.85µs med=5.36µs max=99.02ms p(90)=10.29µs p(95)=13µs
kafka_writer_batch_size............: 4553721 64097.094703/s
kafka_writer_batch_timeout.........: 277h46m40s min=277h46m40s max=277h46m40s
kafka_writer_error_count...........: 1416 19.93128/s
kafka_writer_message_bytes.........: 169 MB 2.4 MB/s
kafka_writer_message_count.........: 4555137 64117.025983/s
kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
kafka_writer_retries_count.........: 1416 19.93128/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 4555137 64117.025983/s
kafka_writer_write_seconds.........: avg=1.78ms min=11.14µs med=17.17µs max=9.92s p(90)=27.69µs p(95)=32.93µs
kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
vus................................: 0 min=0 max=250
vus_max............................: 250 min=219 max=250
running (1m11.0s), 000/250 VUs, 4553721 complete and 0 interrupted iterations
default ✓ [======================================] 250 VUs 1m0s
```
```console
wuban@node-220:/data/benchmark/kafka$ ./k6 run --vus 100 --duration 60s produce_consume.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: produce_consume.js
output: -
scenarios: (100.00%) 1 scenario, 100 max VUs, 1m30s max duration (incl. graceful stop):
* default: 100 looping VUs for 1m0s (gracefulStop: 30s)
█ teardown
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=3.81ms min=82.05µs med=138.81µs max=10.28s p(90)=252.98µs p(95)=363.05µs
iterations.........................: 1735164 24495.127166/s
kafka_reader_dial_count............: 100 1.411689/s
kafka_reader_dial_seconds..........: avg=6.4µs min=0s med=0s max=169.16ms p(90)=0s p(95)=0s
kafka_reader_error_count...........: 0 0/s
kafka_reader_fetch_bytes...........: 471 MB 6.7 MB/s
kafka_reader_fetch_bytes_max.......: 1000000 min=1000000 max=1000000
kafka_reader_fetch_bytes_min.......: 1 min=1 max=1
kafka_reader_fetch_size............: 16829100 237574.629592/s
kafka_reader_fetch_wait_max........: 10s min=10s max=10s
kafka_reader_fetches_count.........: 1480 20.893004/s
kafka_reader_lag...................: 856408 min=266926 max=1011438
kafka_reader_message_bytes.........: 243 MB 3.4 MB/s
kafka_reader_message_count.........: 17360663 245078.648395/s
kafka_reader_offset................: 245940 min=17 max=245940
kafka_reader_queue_capacity........: 100 min=100 max=100
kafka_reader_queue_length..........: 91 min=0 max=100
kafka_reader_read_seconds..........: avg=3.15ms min=0s med=0s max=20.62s p(90)=0s p(95)=0s
kafka_reader_rebalance_count.......: 0 0/s
kafka_reader_timeouts_count........: 454 6.40907/s
kafka_reader_wait_seconds..........: avg=72.94µs min=0s med=0s max=170.49ms p(90)=0s p(95)=0s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 1735164
kafka_writer_attempts_max..........: 10 min=10 max=10
kafka_writer_batch_bytes...........: 64 MB 906 kB/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=69.06µs min=0s med=10.37µs max=45.81ms p(90)=39.34µs p(95)=89.71µs
kafka_writer_batch_seconds.........: avg=9.84µs min=2.87µs med=6.45µs max=46.91ms p(90)=12.08µs p(95)=15.22µs
kafka_writer_batch_size............: 1735164 24495.127166/s
kafka_writer_batch_timeout.........: 277h46m40s min=277h46m40s max=277h46m40s
kafka_writer_error_count...........: 526 7.425487/s
kafka_writer_message_bytes.........: 64 MB 907 kB/s
kafka_writer_message_count.........: 1735690 24502.552652/s
kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
kafka_writer_retries_count.........: 526 7.425487/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 1735690 24502.552652/s
kafka_writer_write_seconds.........: avg=1.92ms min=11.88µs med=18.31µs max=9.87s p(90)=31.24µs p(95)=37.66µs
kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
vus................................: 0 min=0 max=100
vus_max............................: 100 min=100 max=100
running (1m10.8s), 000/100 VUs, 1735164 complete and 0 interrupted iterations
default ✓ [======================================] 100 VUs 1m0s
```
```console
wuban@node-220:/data/benchmark/kafka$ ./k6 run --vus 50 --duration 60s test_bytes.js
/\ |‾‾| /‾‾/ /‾‾/
/\ / \ | |/ / / /
/ \/ \ | ( / ‾‾\
/ \ | |\ \ | (‾) |
/ __________ \ |__| \__\ \_____/ .io
execution: local
script: test_bytes.js
output: -
scenarios: (100.00%) 1 scenario, 50 max VUs, 1m30s max duration (incl. graceful stop):
* default: 50 looping VUs for 1m0s (gracefulStop: 30s)
✓ 10 messages returned
✓ key starts with 'test-id-' string
✓ value is correct
checks.............................: 100.00% ✓ 12519 ✗ 0
data_received......................: 0 B 0 B/s
data_sent..........................: 0 B 0 B/s
iteration_duration.................: avg=738.93ms min=24.05ms med=40.27ms max=7.32s p(90)=2.79s p(95)=3.27s
iterations.........................: 4173 62.267645/s
kafka_reader_dial_count............: 50 0.746078/s
kafka_reader_dial_seconds..........: avg=77.51µs min=0s med=0s max=9.62ms p(90)=0s p(95)=0s
kafka_reader_error_count...........: 0 0/s
kafka_reader_fetch_bytes...........: 0 B 0 B/s
kafka_reader_fetch_bytes_max.......: 1000000 min=1000000 max=1000000
kafka_reader_fetch_bytes_min.......: 1 min=1 max=1
kafka_reader_fetch_size............: 0 0/s
kafka_reader_fetch_wait_max........: 10s min=10s max=10s
kafka_reader_fetches_count.........: 50 0.746078/s
kafka_reader_lag...................: 1201 min=1173 max=703299
kafka_reader_message_bytes.........: 1.4 MB 21 kB/s
kafka_reader_message_count.........: 46281 690.584442/s
kafka_reader_offset................: 1211 min=14 max=1230
kafka_reader_queue_capacity........: 100 min=100 max=100
kafka_reader_queue_length..........: 91 min=4 max=100
kafka_reader_read_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_reader_rebalance_count.......: 0 0/s
kafka_reader_timeouts_count........: 0 0/s
kafka_reader_wait_seconds..........: avg=75.27µs min=0s med=0s max=17.69ms p(90)=0s p(95)=0s
kafka_writer_acks_required.........: 0 min=0 max=0
kafka_writer_async.................: 0.00% ✓ 0 ✗ 417300
kafka_writer_attempts_max..........: 10 min=10 max=10
kafka_writer_batch_bytes...........: 23 MB 342 kB/s
kafka_writer_batch_max.............: 1 min=1 max=1
kafka_writer_batch_queue_seconds...: avg=11.29µs min=0s med=7.61µs max=7.19ms p(90)=14.22µs p(95)=17.5µs
kafka_writer_batch_seconds.........: avg=3.4ms min=6.77µs med=21.03µs max=3.64s p(90)=38.47µs p(95)=50.05µs
kafka_writer_batch_size............: 417300 6226.764493/s
kafka_writer_batch_timeout.........: 277h46m40s min=277h46m40s max=277h46m40s
kafka_writer_error_count...........: 0 0/s
kafka_writer_message_bytes.........: 46 MB 684 kB/s
kafka_writer_message_count.........: 834600 12453.528987/s
kafka_writer_read_timeout..........: 2777h46m40s min=2777h46m40s max=2777h46m40s
kafka_writer_retries_count.........: 0 0/s
kafka_writer_wait_seconds..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
kafka_writer_write_count...........: 834600 12453.528987/s
kafka_writer_write_seconds.........: avg=3.48ms min=8.02µs med=17.9µs max=3.64s p(90)=32.97µs p(95)=41.61µs
kafka_writer_write_timeout.........: 2777h46m40s min=2777h46m40s max=2777h46m40s
vus................................: 1 min=1 max=50
vus_max............................: 50 min=50 max=50
running (1m07.0s), 00/50 VUs, 4173 complete and 0 interrupted iterations
default ✓ [======================================] 50 VUs 1m0s
```
\ No newline at end of file
// Either import the module object
import * as kafka from "k6/x/kafka";
// Or individual classes and constants
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_STRING,
} from "k6/x/kafka";
// Creates a new Writer object to produce messages to Kafka
const writer = new Writer({
// WriterConfig object
brokers: ["192.168.1.10:9092"],
topic: "my-topic",
});
const reader = new Reader({
// ReaderConfig object
brokers: ["192.168.1.10:9092"],
topic: "my-topic",
});
const connection = new Connection({
// ConnectionConfig object
address: "192.168.1.10:9092",
});
const schemaRegistry = new SchemaRegistry();
// Can accept a SchemaRegistryConfig object
if (__VU == 0) {
// Create a topic on initialization (before producing messages)
connection.createTopic({
// TopicConfig object
topic: "my-topic",
});
}
export default function () {
// Fetch the list of all topics
//const topics = connection.listTopics();
// console.log(topics); // list of topics
// Produces message to Kafka
writer.produce({
// ProduceConfig object
messages: [
// Message object(s)
{
key: schemaRegistry.serialize({
data: "my-key",
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: "my-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-valuemy-value",
schemaType: SCHEMA_TYPE_STRING,
}),
},
],
});
// Consume messages from Kafka
// let messages = reader.consume({
// // ConsumeConfig object
// limit: 10,
// });
// your messages
// console.log(messages);
// You can use checks to verify the contents,
// length and other properties of the message(s)
// To serialize the data back into a string, you should use
// the deserialize method of the Schema Registry client. You
// can use it inside a check, as shown in the example scripts.
// let deserializedValue = schemaRegistry.deserialize({
// data: messages[0].value,
// schemaType: SCHEMA_TYPE_STRING,
// });
}
export function teardown(data) {
// Delete the topic
//connection.deleteTopic("my-topic");
// Close all connections
writer.close();
reader.close();
connection.close();
}
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 byte array messages per iteration.
*/
import { check } from "k6";
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_BYTES,
} from "k6/x/kafka"; // import kafka extension
const brokers = ["192.168.1.10:9092"];
const topic = "xk6_kafka_byte_array_topic";
const writer = new Writer({
brokers: brokers,
topic: topic,
autoCreateTopic: true,
});
const reader = new Reader({
brokers: brokers,
topic: topic,
});
const connection = new Connection({
address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({ topic: topic });
}
const payload = "byte array payload";
export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
// The data type of the key is a string
key: schemaRegistry.serialize({
data: Array.from("test-id-abc-" + index, (x) => x.charCodeAt(0)),
schemaType: SCHEMA_TYPE_BYTES,
}),
// The data type of the value is a byte array
value: schemaRegistry.serialize({
data: Array.from(payload, (x) => x.charCodeAt(0)),
schemaType: SCHEMA_TYPE_BYTES,
}),
},
{
key: schemaRegistry.serialize({
data: Array.from("test-id-def-" + index, (x) => x.charCodeAt(0)),
schemaType: SCHEMA_TYPE_BYTES,
}),
value: schemaRegistry.serialize({
data: Array.from(payload, (x) => x.charCodeAt(0)),
schemaType: SCHEMA_TYPE_BYTES,
}),
},
];
writer.produce({
messages: messages,
});
}
// Read 10 messages only
let messages = reader.consume({ limit: 10 });
check(messages, {
"10 messages returned": (msgs) => msgs.length == 10,
"key starts with 'test-id-' string": (msgs) =>
String.fromCharCode(
...schemaRegistry.deserialize({
data: msgs[0].key,
schemaType: SCHEMA_TYPE_BYTES,
}),
).startsWith("test-id-"),
"value is correct": (msgs) =>
String.fromCharCode(
...schemaRegistry.deserialize({
data: msgs[0].value,
schemaType: SCHEMA_TYPE_BYTES,
}),
) == payload,
});
}
// export function teardown(data) {
// if (__VU == 0) {
// // Delete the topic
// connection.deleteTopic(topic);
// }
// writer.close();
// reader.close();
// connection.close();
// }
/*
This is a k6 test script that imports the xk6-kafka and
tests Kafka with a 200 JSON messages per iteration.
*/
import { check } from "k6";
// import * as kafka from "k6/x/kafka";
import {
Writer,
Reader,
Connection,
SchemaRegistry,
SCHEMA_TYPE_STRING,
} from "k6/x/kafka"; // import kafka extension
// Prints module-level constants
// console.log(kafka);
const brokers = ["192.168.1.10:9092"];
const topic = "xk6_kafka_json_topic";
const writer = new Writer({
brokers: brokers,
topic: topic,
autoCreateTopic: true,
});
const reader = new Reader({
brokers: brokers,
topic: topic,
});
const connection = new Connection({
address: brokers[0],
});
const schemaRegistry = new SchemaRegistry();
if (__VU == 0) {
connection.createTopic({ topic: topic });
}
export const options = {
thresholds: {
// Base thresholds to see if the writer or reader is working
kafka_writer_error_count: ["count == 0"],
kafka_reader_error_count: ["count == 0"],
},
};
export default function () {
for (let index = 0; index < 100; index++) {
let messages = [
{
key: schemaRegistry.serialize({
data: "test-key-string",
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: "test-value-string",
schemaType: SCHEMA_TYPE_STRING,
}),
headers: {
mykey: "myvalue",
},
offset: index,
partition: 0,
time: new Date(), // Will be converted to timestamp automatically
},
{
key: schemaRegistry.serialize({
data: "test-key-string",
schemaType: SCHEMA_TYPE_STRING,
}),
value: schemaRegistry.serialize({
data: "test-value-string",
schemaType: SCHEMA_TYPE_STRING,
}),
headers: {
mykey: "myvalue",
},
},
];
writer.produce({ messages: messages });
}
// Read 10 messages only
let messages = reader.consume({ limit: 10 });
check(messages, {
"10 messages are received": (messages) => messages.length == 10,
});
check(messages[0], {
"Topic equals to xk6_kafka_json_topic": (msg) => msg["topic"] == topic,
"Key is a string and is correct": (msg) =>
schemaRegistry.deserialize({
data: msg.key,
schemaType: SCHEMA_TYPE_STRING,
}) == "test-key-string",
"Value is a string and is correct": (msg) =>
typeof schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_STRING,
}) == "string" &&
schemaRegistry.deserialize({
data: msg.value,
schemaType: SCHEMA_TYPE_STRING,
}) == "test-value-string",
"Header equals {'mykey': 'myvalue'}": (msg) =>
"mykey" in msg.headers &&
String.fromCharCode(...msg.headers["mykey"]) == "myvalue",
"Time is past": (msg) => new Date(msg["time"]) < new Date(),
"Partition is zero": (msg) => msg["partition"] == 0,
"Offset is gte zero": (msg) => msg["offset"] >= 0,
"High watermark is gte zero": (msg) => msg["highWaterMark"] >= 0,
});
}
export function teardown(data) {
if (__VU == 0) {
// Delete the topic
connection.deleteTopic(topic);
}
writer.close();
reader.close();
connection.close();
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment