安装Kafka:
我们使用3台机器搭建Kafka集群:192.168.121.132 h201192.168.121.131 h202192.168.121.130 h203
下载kafka_2.10-0.10.1.0.tgz
[hadoop@h201 ~]$ tar -zxvf kafka_2.10-0.10.1.0.tgz
[hadoop@h201 ~]$ cd kafka_2.10-0.10.1.0修改配置文件kafka_2.10-0.10.1.0/config/server.properties,修改如下内容为:zookeeper.connect=h201:2181,h202:2181,h203:2181/kafka这里需要说明的是,默认Kafka会使用ZooKeeper默认的/路径,这样有关Kafka的ZooKeeper配置就会散落在根路径下面,如果你有其他的应用也在使用ZooKeeper集群,查看ZooKeeper中数据可能会不直观,所以强烈建议指定一个chroot路径,直接在zookeeper.connect配置项中指定。使用如下命令连接到任意一台ZooKeeper服务器:
[hadoop@h201 ~]$ cd zookeeper-3.4.5-cdh5.5.2/[hadoop@h201 zookeeper-3.4.5]$ bin/zkCli.shconnect h201:2181
create /kafka
ls /kafka/brokers/topicsls /kafka/brokers/ids将kafka拷贝到其他两台机器:
[hadoop@h201 ~]$ scp -r kafka_2.10-0.10.1.0/ h202:/home/hadoop/[hadoop@h201 ~]$ scp -r kafka_2.10-0.10.1.0/ h203:/home/hadoop/
修改server.properties中的broker.id,三台机器都修改
因为Kafka集群需要保证各个Broker的id在整个集群中必须唯一,需要调整这个配置项的值(如果在单机上,可以通过建立多个Broker进程来模拟分布式的Kafka集群,但也需要Broker的id唯一,还需要修改一些配置目录的信息)。[hadoop@h201 ~]$ vi kafka_2.10-0.10.1.0/config/server.propertiesbroker.id=0[hadoop@h202 ~]$ vi kafka_2.10-0.10.1.0/config/server.propertiesbroker.id=1[hadoop@h203 ~]$ vi kafka_2.10-0.10.1.0/config/server.propertiesbroker.id=2首先启动zookeeper集群,然后三台机器都执行以下命令:
kafka-server-start.sh -daemon /home/hadoop/kafka_2.10-0.10.1.0/config/server.properties
kafka-server-stop.sh -daemon /home/hadoop/kafka_2.10-0.10.1.0/config/server.properties创建topic:
bin/kafka-topics.sh --create --zookeeper h201:2181/kafka --replication-factor 3 --partitions 5 --topic data
查看创建的topic:
kafka-topics.sh --describe --zookeeper h201:2181/kafka --topic data
kafka-topics.sh --zookeeper h201:2181/kafka --list
上面Leader、Replicas、Isr的含义如下:
Partition: 分区Leader : 负责读写指定分区的节点Replicas : 复制该分区log的节点列表Isr : "in-sync" replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
删除topic:
在kafka集群的所有三台机器中修改配置文件server.properties添加如下配置:delete.topic.enable=truekafka-topics.sh --delete --zookeeper h201:2181/kafka --topic data
自带压力测试
kafka-console-producer.sh --broker-list h201:9092 --topic data
kafka-console-consumer.sh --zookeeper h201:2181/kafka --topic data --from-beginning
生产者:throughput吞吐量
[hadoop@h202 config]$ kafka-producer-perf-test.sh --num-records 100000 --record-size 3000 --topic data --producer-props bootstrap.servers=h201:9092,h202:9092,h203:9092 acks=all --record-size 1000 --throughput 1000SLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/home/hadoop/work/hadoop2.6jar/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/home/hadoop/kafka_2.10-0.10.1.0/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]4399 records sent, 860.2 records/sec (0.82 MB/sec), 64.0 ms avg latency, 725.0 max latency.2857 records sent, 513.4 records/sec (0.49 MB/sec), 237.0 ms avg latency, 3464.0 max latency.4461 records sent, 892.0 records/sec (0.85 MB/sec), 3908.1 ms avg latency, 6209.0 max latency.7809 records sent, 1561.8 records/sec (1.49 MB/sec), 2653.6 ms avg latency, 5225.0 max latency.6177 records sent, 1235.2 records/sec (1.18 MB/sec), 961.4 ms avg latency, 2343.0 max latency.4991 records sent, 976.7 records/sec (0.93 MB/sec), 218.8 ms avg latency, 1001.0 max latency.4444 records sent, 843.1 records/sec (0.80 MB/sec), 390.9 ms avg latency, 2136.0 max latency.5867 records sent, 1169.0 records/sec (1.11 MB/sec), 149.5 ms avg latency, 1086.0 max latency.4713 records sent, 810.4 records/sec (0.77 MB/sec), 95.1 ms avg latency, 1304.0 max latency.5235 records sent, 1046.8 records/sec (1.00 MB/sec), 1083.5 ms avg latency, 2910.0 max latency.6041 records sent, 1122.9 records/sec (1.07 MB/sec), 411.3 ms avg latency, 1252.0 max latency.5402 records sent, 1075.5 records/sec (1.03 MB/sec), 92.3 ms avg latency, 471.0 max latency.5019 records sent, 1003.2 records/sec (0.96 MB/sec), 46.6 ms avg latency, 331.0 max latency.4971 records sent, 994.2 records/sec (0.95 MB/sec), 142.3 ms avg latency, 677.0 max latency.5026 records sent, 1005.0 records/sec (0.96 MB/sec), 76.9 ms avg latency, 555.0 max latency.4461 records sent, 712.4 records/sec (0.68 MB/sec), 858.5 ms avg latency, 2328.0 max latency.6240 records sent, 1247.8 records/sec (1.19 MB/sec), 596.2 ms avg latency, 2434.0 max latency.5564 records sent, 1112.6 records/sec (1.06 MB/sec), 306.2 ms avg latency, 1089.0 max latency.4998 records sent, 999.6 records/sec (0.95 MB/sec), 110.0 ms avg latency, 1215.0 max latency.100000 records sent, 999.360409 records/sec (0.95 MB/sec), 692.04 ms avg latency, 6209.00 ms max latency, 156 ms 50th, 4165 ms 95th, 5337 ms 99th, 5761 ms 99.9th.消费者:
[hadoop@h201 ~]$ kafka-consumer-perf-test.sh --zookeeper h201:2181/kafka --messages 100000 --topic data --threads 3start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.secSLF4J: Class path contains multiple SLF4J bindings.SLF4J: Found binding in [jar:file:/home/hadoop/work/hadoop2.6jar/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Found binding in [jar:file:/home/hadoop/kafka_2.10-0.10.1.0/libs/slf4j-log4j12-1.7.21.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]2018-07-08 22:40:31:658, 2018-07-08 22:40:34:270, 9.8706, 3.7789, 3460, 1324.65541、 生产者测试结果及分析
结果分析:
1)kafka在批处理,多线程,不适用同步复制的情况下,吞吐率是比较高的,可以达26MB/s,消息数达17w条/s以上。2)使用批处理或多线程对提升生产者吞吐率效果明显。3)复制因子会对吞吐率产生较明显影响 使用同步复制时,复制因子会对吞吐率产生较明显的影响。复制因子为2比因子为1(即无复制)时,吞吐率下降40%左右。 4)使用sync方式,性能有明显下降。 使用Sync方式producer吞吐率会有明显下降5)压缩与吞吐率 使用Gzip及Snappy方式压缩,吞吐率反而有下降,原因待分析。而Snappy方式吞吐率高于gzip方式。6)分区数与吞吐率 分区数增加生产者吞吐率反而有所下降
2、消费者结果及分析
结果分析:1)kafka consumer吞吐率在parition,threads较大的情况下,在测试场景下,最大吞吐率达到了34MB/s
2)复制因子,影响较小。replication factor并不会影响consumer的吞吐率测试, consumer从每个partition的leader读数据,而与replication factor无关。同样,consumer吞吐率也与同步复制还是异步复制无关。3)线程数和partition与吞吐率关系当分区数较大时,增加thread数可显著提升consumer的吞吐率。
但要注意在分区较大时线程数不改大于分区数,否则会出现No broker partitions consumed by consumer,对提升吞吐率也没有帮助。4)批处理数对吞吐率影响
改变批处理数对吞吐率影响不大
5)压缩与吞吐率压缩对吞吐率影响小。
附优化后的配置文件:
broker.id=1
listeners=PLAINTEXT://0.0.0.0:6667
advertised.listeners=PLAINTEXT://203.150.54.215:6667
port=6667
host.name=203.150.54.215
# Replication configurations
num.replica.fetchers=1
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.lag.time.max.ms=10000
replica.lag.max.messages=4000
compression.codec:none
controller.socket.timeout.ms=30000
controller.message.queue.size=10
controlled.shutdown.enable=true
default.replication.factor:2
# Log configuration
num.partitions=1
num.recovery.threads.per.data.dir=1
message.max.bytes=1000000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
log.dirs=/mnt/kafka-logs/kafka00
log.index.interval.bytes=4096
log.index.size.max.bytes=10485760
log.retention.hours=72 #保留三天,也可以更短
log.flush.interval.ms=10000 #每间隔1秒钟时间,刷数据到磁盘
log.flush.interval.messages=20000 #log数据文件刷新策略
log.flush.scheduler.interval.ms=2000
log.roll.hours=72
log.retention.check.interval.ms=300000
log.segment.bytes=1073741824 #kafka启动时是单线程扫描目录(log.dir)下所有数据文件
# ZK configuration
zookeeper.connection.timeout.ms=6000
zookeeper.sync.time.ms=2000
zookeeper.connect=203.150.54.215:2181,203.150.54.216:2182,203.150.54.217:2183
# Socket server configuration
num.io.threads=5 #配置线程数量为cpu核数加1
num.network.threads=8 #配置线程数量为cpu核数2倍,最大不超过3倍
socket.request.max.bytes=104857600
socket.receive.buffer.bytes=1048576
socket.send.buffer.bytes=1048576
queued.max.requests=500
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000