-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathKafkaSenderBenchmarks.java
More file actions
88 lines (73 loc) · 2.96 KB
/
KafkaSenderBenchmarks.java
File metadata and controls
88 lines (73 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.reporter;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.InternetProtocol;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.Wait;
import zipkin2.reporter.internal.SenderBenchmarks;
import zipkin2.reporter.kafka.KafkaSender;
import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.testcontainers.utility.DockerImageName.parse;
public class KafkaSenderBenchmarks extends SenderBenchmarks {
static final Logger LOGGER = LoggerFactory.getLogger(KafkaContainer.class);
static final int KAFKA_PORT = 19092;
static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.6.0"));
waitStrategy = Wait.forHealthcheck();
// Kafka broker listener port (19092) needs to be exposed for test cases to access it.
addFixedExposedPort(KAFKA_PORT, KAFKA_PORT, InternetProtocol.TCP);
withLogConsumer(new Slf4jLogConsumer(LOGGER));
}
String bootstrapServer() {
return getHost() + ":" + getMappedPort(KAFKA_PORT);
}
}
KafkaContainer kafka;
KafkaConsumer<byte[], byte[]> consumer;
@Override protected BytesMessageSender createSender() {
kafka = new KafkaContainer();
kafka.start();
Properties config = new Properties();
config.put(BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServer());
config.put(GROUP_ID_CONFIG, "zipkin");
consumer =
new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer());
consumer.subscribe(Collections.singletonList("zipkin"));
new Thread(() -> {
while (true) {
Iterator<ConsumerRecord<byte[], byte[]>> messages = consumer.poll(1000L).iterator();
while (messages.hasNext()) {
messages.next();
}
}
}).start();
return KafkaSender.create(kafka.bootstrapServer());
}
@Override protected void afterSenderClose() {
kafka.stop();
}
// Convenience main entry-point
public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.include(".*" + KafkaSenderBenchmarks.class.getSimpleName() + ".*")
.build();
new Runner(opt).run();
}
}