大家好,我是jobleap.cn的小九。
Python 的queue库是标准库中专门提供线程安全队列的模块,核心用于多线程场景下的任务分发、数据传递,解决线程间的同步与通信问题。该库实现了 FIFO(先进先出)、LIFO(后进先出)、优先级队列等多种队列类型,并封装了一套易用且线程安全的 API。本文将从核心类、常用 API 详解、场景化示例到实战案例,全面串联queue库的所有常用用法。
一、核心概念与前置说明
- 线程安全:
queue库的所有方法都经过锁机制处理,多线程操作时无需额外加锁。 - 核心队列类:
类名 特性 适用场景 queue.QueueFIFO(先进先出)队列 通用任务排队 queue.LifoQueueLIFO(后进先出)队列 栈式任务处理 queue.PriorityQueue优先级队列(元组优先级) 按优先级执行任务 queue.SimpleQueue轻量级FIFO队列 无阻塞/join需求的简单场景 - 异常类型:
queue.Empty:队列为空时调用get(block=False)或超时触发。queue.Full:队列满时调用put(block=False)或超时触发。
二、常用 API 详解(附基础示例)
以下所有示例均基于import queue导入模块,先以最常用的Queue类为例,详解通用 API,再扩展其他队列类。
1. 初始化队列:Queue(maxsize=0)
- 参数:
maxsize表示队列最大容量,0为无限容量。 - 示例:
importqueue# 初始化容量为5的FIFO队列q=queue.Queue(maxsize=5)# 初始化无限容量的LIFO队列lifo_q=queue.LifoQueue(maxsize=0)# 初始化优先级队列pri_q=queue.PriorityQueue(maxsize=10)
2. 入队:put(item, block=True, timeout=None)
- 功能:将元素
item加入队列。 - 参数:
block=True:队列满时阻塞等待;block=False直接抛Full异常。timeout:阻塞超时时间(秒),超时后抛Full异常(仅block=True有效)。
- 示例:
# 基础入队(阻塞模式)q.put("task1")# 队列空,直接入队q.put("task2",timeout=2)# 若队列满,阻塞2秒后抛Full# 非阻塞入队(需捕获异常)try:# 假设队列已装满5个元素foriinrange(6):q.put(i,block=False)exceptqueue.Full:print("队列已满,无法入队")
3. 出队:get(block=True, timeout=None)
- 功能:从队列头部取出并删除元素(
LifoQueue取尾部,PriorityQueue取优先级最小的)。 - 参数:
block=True:队列为空时阻塞等待;block=False直接抛Empty异常。timeout:阻塞超时时间(秒),超时后抛Empty异常。
- 示例:
# 基础出队(阻塞模式)item1=q.get()# 取出"task1"(FIFO)print(item1)# 输出:task1# 非阻塞出队(需捕获异常)try:item=q.get(block=False)exceptqueue.Empty:print("队列为空,无法出队")# 带超时的出队try:item=q.get(timeout=1)exceptqueue.Empty:print("1秒内未获取到元素,队列空")
4. 队列状态查询:qsize()、empty()、full()
qsize():返回队列当前元素数量(多线程下为近似值,因查询后状态可能变化)。empty():判断队列是否为空(返回布尔值,多线程下非绝对准确)。full():判断队列是否已满(返回布尔值,多线程下非绝对准确)。- 示例:
# 初始状态:已入队task2,容量5print(q.qsize())# 输出:1print(q.empty())# 输出:Falseprint(q.full())# 输出:False# 填满队列foriinrange(4):q.put(f"task{i+3}")print(q.full())# 输出:True
5. 任务完成标记:task_done()
- 功能:告知队列“某个入队的任务已处理完成”,需与
join()配合使用。 - 注意:调用次数需等于入队元素数,否则
join()会永久阻塞;未入队却调用会抛ValueError。
6. 阻塞等待所有任务完成:join()
- 功能:阻塞主线程,直到队列中所有元素都被取出并调用
task_done()。 - 示例:
# 入队3个任务q=queue.Queue()q.put("task1")q.put("task2")q.put("task3")# 取出并标记完成defprocess_task():whilenotq.empty():item=q.get()print(f"处理任务:{item}")q.task_done()# 标记任务完成process_task()q.join()# 等待所有任务完成print("所有任务处理完毕")
7. 其他队列类的特有用法
(1)LifoQueue(后进先出)
- 入队/出队逻辑与栈一致,
get()取最后入队的元素:lifo_q=queue.LifoQueue()lifo_q.put("task1")lifo_q.put("task2")print(lifo_q.get())# 输出:task2(后进先出)
(2)PriorityQueue(优先级队列)
- 入队元素必须是元组:
(优先级数值, 数据),优先级数值越小,越先出队;数值相同则按数据排序:pri_q=queue.PriorityQueue()pri_q.put((3,"低优先级任务"))pri_q.put((1,"高优先级任务"))pri_q.put((2,"中优先级任务"))pri_q.put((1,"同优先级任务1"))pri_q.put((1,"同优先级任务2"))# 依次出队:高优先级→同优先级1→同优先级2→中优先级→低优先级whilenotpri_q.empty():print(pri_q.get()[1])
(3)SimpleQueue(轻量级FIFO)
- 无
task_done()和join()方法,其他用法与Queue一致,更轻量:simple_q=queue.SimpleQueue()simple_q.put("hello")print(simple_q.get())# 输出:hello# simple_q.task_done() # 报错:AttributeError(无此方法)
三、实战案例:多线程生产者消费者模型
该案例串联queue库的所有核心 API,模拟“生产者生产任务、消费者处理任务”的经典场景,包含:
- 多线程(生产者/消费者)操作队列;
put()/get()带超时;task_done()/join()等待所有任务完成;- 异常捕获(Empty/Full);
- 队列状态查询。
完整代码
importqueueimportthreadingimporttimeimportrandom# 1. 初始化队列(容量10)task_queue=queue.Queue(maxsize=10)# 2. 生产者函数:生成任务并入队defproducer(name,task_count):""" 生产者:生成指定数量的任务 :param name: 生产者名称 :param task_count: 生产任务数 """foriinrange(task_count):task=f"{name}-任务{i+1}"try:# 非阻塞入队,超时1秒task_queue.put(task,block=True,timeout=1)print(f"[{time.ctime()}] 生产者{name}:入队{task}| 队列当前大小:{task_queue.qsize()}")time.sleep(random.uniform(0.1,0.5))# 模拟生产耗时exceptqueue.Full:print(f"[{time.ctime()}] 生产者{name}:队列已满,跳过任务{i+1}")# 3. 消费者函数:取出任务并处理defconsumer(name):""" 消费者:循环取任务处理,直到队列为空 :param name: 消费者名称 """whileTrue:try:# 非阻塞出队,超时2秒(避免永久阻塞)task=task_queue.get(block=True,timeout=2)print(f"[{time.ctime()}] 消费者{name}:取出{task}| 队列剩余:{task_queue.qsize()}")# 模拟处理耗时time.sleep(random.uniform(0.2,0.8))task_queue.task_done()# 标记任务完成print(f"[{time.ctime()}] 消费者{name}:完成{task}")exceptqueue.Empty:print(f"[{time.ctime()}] 消费者{name}:队列为空,退出")break# 4. 启动多线程if__name__=="__main__":# 创建2个生产者线程(分别生产5个、6个任务)p1=threading.Thread(target=producer,args=("P1",5))p2=threading.Thread(target=producer,args=("P2",6))# 创建3个消费者线程c1=threading.Thread(target=consumer,args=("C1",))c2=threading.Thread(target=consumer,args=("C2",))c3=threading.Thread(target=consumer,args=("C3",))# 启动生产者p1.start()p2.start()# 等待生产者完成入队p1.join()p2.join()print(f"\n[{time.ctime()}] 所有生产者完成任务入队,队列最终大小:{task_queue.qsize()}\n")# 启动消费者c1.start()c2.start()c3.start()# 等待队列所有任务处理完成task_queue.join()print(f"\n[{time.ctime()}] 所有任务处理完毕,主线程退出")运行结果(示例)
[Tue Dec 9 10:00:00 2025] 生产者P1:入队 P1-任务1 | 队列当前大小:1 [Tue Dec 9 10:00:00 2025] 生产者P2:入队 P2-任务1 | 队列当前大小:2 [Tue Dec 9 10:00:01 2025] 生产者P1:入队 P1-任务2 | 队列当前大小:3 ... [Tue Dec 9 10:00:05 2025] 所有生产者完成任务入队,队列最终大小:11 [Tue Dec 9 10:00:05 2025] 消费者C1:取出 P1-任务1 | 队列剩余:10 [Tue Dec 9 10:00:05 2025] 消费者C2:取出 P1-任务2 | 队列剩余:9 [Tue Dec 9 10:00:06 2025] 消费者C1:完成 P1-任务1 ... [Tue Dec 9 10:00:10 2025] 消费者C3:队列为空,退出 [Tue Dec 9 10:00:10 2025] 所有任务处理完毕,主线程退出四、关键注意事项
- 多线程下的状态准确性:
qsize()/empty()/full()仅为参考,因调用后队列状态可能被其他线程修改,切勿依赖这些方法做“是否入队/出队”的判断,建议用put(timeout)/get(timeout)或捕获异常。 - SimpleQueue 的适用场景:无
task_done()/join(),适合无需等待所有任务完成的简单场景,性能略高于Queue。 - 优先级队列的元素要求:元组的第一个元素(优先级)必须是可比较的类型(如int/float),否则会抛
TypeError。 - 避免死锁:
task_done()需在get()后调用,且次数与入队数一致;join()需在所有生产者完成入队后调用。
五、总结
queue库是 Python 多线程编程的核心工具,其常用 API 可归纳为“入队(put)、出队(get)、状态查询(qsize/empty/full)、任务同步(task_done/join)”四大类。通过 FIFO/LIFO/优先级队列的灵活选择,结合多线程的生产者消费者模型,可高效解决线程间的任务分发与同步问题。掌握上述 API 与实战案例,即可覆盖queue库的绝大多数应用场景。