程序地带

Kafka 入门(四)-- Python Kafka Client 性能测试


一、前言

  由于工作原因使用到了 Kafka,而现有的代码并不能满足性能需求,所以需要开发高效读写 Kafka 的工具,本文是一个 Python Kafka Client 的性能测试记录,通过本次测试,可以知道选用什么第三方库的性能最高,选用什么编程模型开发出来的工具效率最高。


 


二、第三方库性能测试
1.第三方库

  此次测试的是三个主要的 Python Kafka Client:pykafka、kafka-python 和 confluent-kafka,具体介绍见官网:


pykafka:https://pypi.org/project/pykafka/
kafka-python:https://pypi.org/project/kafka-python/
confluent_kafka:https://pypi.org/project/confluent-kafka/
2.测试环境

       此次测试使用的 Python 版本是2.7,第三方库的版本为:


pykafka:2.8.0
kafka-python:2.0.2
confluent-kafka:1.5.0

       使用的数据总量有50万,每条数据大小为2KB,总共为966MB。


3.测试过程

(1)Kafka Producer 测试


  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Producer 对象,然后调用相应的 produce 方法将数据推送给 Kafka,数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以推送的数据条数和大小,比较得出性能最优的。


  代码示例(以 pykafka 为例):


1 import sys
2 from datetime import datetime
3 from pykafka import KafkaClient
4
5
6 class KafkaProducerTool():
7 def __init__(self, broker, topic):
8 client = KafkaClient(hosts=broker)
9 self.topic = client.topics[topic]
10 self.producer = self.topic.get_producer()
11
12 def send_msg(self, msg):
13 self.producer.produce(msg)
14
15
16 if __name__ == "__main__":
17 producer = KafkaProducerTool(broker, topic)
18 print(datetime.now())
19 for line in sys.stdin:
20 producer.send_msg(line.strip())
21 producer.producer.stop()
22 print(datetime.now())

(2)Kafka Consumer 测试


  分别使用 pykafka、kafka-python 和 confluent-kafka 实例化一个 Kafka 的 Consumer 对象,然后调用相应的 consume 方法从 Kafka 中消费数据,要消费下来的数据总条数为50万,比较三个库所耗费的时间,并计算每秒钟可以消费的数据条数和大小,比较得出性能最优的。


  代码示例(以 pykafka 为例):


1 from datetime import datetime
2 from pykafka import KafkaClient
3
4
5 class KafkaConsumerTool():
6 def __init__(self, broker, topic):
7 client = KafkaClient(hosts=broker)
8 self.topic = client.topics[topic]
9 self.consumer = self.topic.get_simple_consumer()
10
11 def receive_msg(self):
12 count = 0
13 print(datetime.now())
14 while True:
15 msg = self.consumer.consume()
16 if msg:
17 count += 1
18 if count == 500000:
19 print(datetime.now())
20 return
21
22
23 if __name__ == "__main__":
24 consumer = KafkaConsumerTool(broker, topic)
25 consumer.receive_msg()
26 consumer.consumer.stop()
4.测试结果
Kafka Producer 测试结果:
 
总耗时/秒
每秒数据量/MB
每秒数据条数
confluent_kafka
35
27.90
14285.71
pykafka
50
19.53
10000
kafka-python
532
1.83
939.85
Kafka Consumer 测试结果:
 
总耗时/秒
每秒数据量/MB
每秒数据条数
confluent_kafka
39
25.04
12820.51
kafka-python
52
18.78
9615.38
pykafka
335
2.92
1492.54
5.测试结论

  经过测试,在此次测试的三个库中,生产消息的效率排名是:confluent-kafka > pykafka > kafka-python,消费消息的效率排名是:confluent-kafka > kafka-python > pykafka,由此可见 confluent-kafka 的性能是其中最优的,因而选用这个库进行后续开发。


 


三、多线程模型性能测试
1.编程模型

  经过前面的测试已经知道 confluent-kafka 这个库的性能是很优秀的了,但如果还需要更高的效率,应该怎么办呢?当单线程(或者单进程)不能满足需求时,我们很容易想到使用多线程(或者多进程)来增加并发提高效率,考虑到线程的资源消耗比进程少,所以打算选用多线程来进行开发。那么多线程消费 Kafka 有什么实现方式呢?我想到的有两种:


一个线程实现一个 Kafka Consumer,最多可以有 n 个线程同时消费 Topic(其中 n 是该 Topic 下的分区数量);
多个线程共用一个 Kafka Consumer,此时也可以实例化多个 Consumer 同时消费。

    


  对比这两种多线程模型:


模型1实现方便,可以保证每个分区有序消费,但 Partition 数量会限制消费能力;
模型2并发度高,可扩展能力强,消费能力不受 Partition 限制。
 2.测试过程

(1)多线程模型1


  测试代码:


1 import time
2 from threading import Thread
3 from datetime import datetime
4 from confluent_kafka import Consumer
5
6
7 class ChildThread(Thread):
8 def __init__(self, name, broker, topic):
9 Thread.__init__(self, name=name)
10 self.con = KafkaConsumerTool(broker, topic)
11
12 def run(self):
13 self.con.receive_msg()
14
15
16 class KafkaConsumerTool:
17 def __init__(self, broker, topic):
18 config = {
19 "bootstrap.servers": broker,
20 "session.timeout.ms": 30000,
21 "auto.offset.reset": "earliest",
22 "api.version.request": False,
23 "broker.version.fallback": "2.6.0",
24 "group.id": "test"
25 }
26 self.consumer = Consumer(config)
27 self.topic = topic
28
29 def receive_msg(self):
30 self.consumer.subscribe([self.topic])
31 print(datetime.now())
32 while True:
33 msg = self.consumer.poll(timeout=30.0)
34 print(msg)
35
36
37 if __name__ == "__main__":
38 thread_num = 10
39 threads = [ChildThread("thread_" + str(i + 1), broker, topic) for i in range(thread_num)]
40
41 for i in range(thread_num):
42 threads[i].setDaemon(True)
43 for i in range(thread_num):
44 threads[i].start()

  因为我使用的 Topic 共有8个分区,所以我分别测试了线程数在5个、8个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。


(2)多线程模型2


  测试代码:


1 import time
2 from datetime import datetime
3 from confluent_kafka import Consumer
4 from threadpool import ThreadPool, makeRequests
5
6
7 class KafkaConsumerTool:
8 def __init__(self, broker, topic):
9 config = {
10 "bootstrap.servers": broker,
11 "session.timeout.ms": 30000,
12 "auto.offset.reset": "earliest",
13 "api.version.request": False,
14 "broker.version.fallback": "2.6.0",
15 "group.id": "mini-spider"
16 }
17 self.consumer = Consumer(config)
18 self.topic = topic
19
20 def receive_msg(self, x):
21 self.consumer.subscribe([self.topic])
22 print(datetime.now())
23 while True:
24 msg = self.consumer.poll(timeout=30.0)
25 print(msg)
26
27
28 if __name__ == "__main__":
29 thread_num = 10
30 consumer = KafkaConsumerTool(broker, topic)
31 pool = ThreadPool(thread_num)
32 for r in makeRequests(consumer.receive_msg, [i for i in range(thread_num)]):
33 pool.putRequest(r)
34 pool.wait()

  主要使用 threadpool 这个第三方库来实现线程池,此处当然也可以使用其他库来实现,这里我分别测试了线程数量在5个和10个时消费50万数据所需要的时间,并计算每秒可消费的数据条数。


3.测试结果
多线程模型1
 总数据量/万
线程数量
总耗时/秒
每秒数据条数
50
5
27
18518.51
50
8
24
20833.33
50
10
26
19230.76
多线程模型2
  总数据量/万
线程数量
总耗时/秒
每秒数据条数
50
5
17
29411.76
50
10
13
38461.53
4.测试结论

  使用多线程可以有效提高 Kafka 的 Consumer 消费数据的效率,而选用线程池共用一个 KafkaConsumer 的消费方式的消费效率更高。


 


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/TM0831/p/14285274.html

随机推荐

Java最全,最常用的输入输出汇总

Java最全,最常用的输入输出汇总

目录第一种输入+不换行输出截图第二种写法+换行输出截图数组输入输出一维数组截图二维数组截图第一种输入+不换行输出importjava.util.Scanner;//使用Scan...

一位懒得写博客的小学生 阅读(144)