news 2026/4/16 14:48:34

Python 之多线程通信的几种常用方法

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
Python 之多线程通信的几种常用方法

一般来说,大部分遇到的多线程,只要能各自完成好各自的任务即可。少数情况下,不同线程可能需要在线程安全的情况下,进行通信和数据交换。Python 中常用的线程通信有以下方法。

共享变量

共享变量是最简单的线程通信方式,比如进行计数更新等操作,但需要配合锁来保证线程安全。

import threading # 共享变量 shared_data = 0 lock = threading.Lock() def worker(): global shared_data with lock: # 使用锁保证线程安全 shared_data += 1 threads = [] for i in range(5): t = threading.Thread(target=worker) threads.append(t) t.start() for t in threads: t.join() print(f"最终结果: {shared_data}") # 应该是5

Queue队列

最常用的线程安全通信方式,是生产者-消费者模式的理想选择。比如使用优先级队列优先消费高优先级的数据(序号越低,优先级越高,越优先消费)。

from time import sleep from random import random, randint from threading import Thread from queue import PriorityQueue queue = PriorityQueue() def producer(queue): print('Producer: Running') for i in range(5): # create item with priority value = random() priority = randint(0, 5) item = (priority, value) queue.put(item) # wait for all items to be processed queue.join() queue.put(None) print('Producer: Done') def consumer(queue): print('Consumer: Running') while True: # get a unit of work item = queue.get() if item is None: break sleep(item[1]) print(item) queue.task_done() print('Consumer: Done') producer = Thread(target=producer, args=(queue,)) producer.start() consumer = Thread(target=consumer, args=(queue,)) consumer.start() producer.join() consumer.join()
Producer: Running Consumer: Running (0, 0.9945246262101098) (2, 0.35853829355476663) (2, 0.4794139132317813) (3, 0.8460111545035349) (5, 0.6047655828611674) Producer: Done Consumer: Done

Event事件

线程模提供了 Event 用于线程间的简单信号传递。Event 对象管理内部标志的状态。标志初始为False,并通过 set() 方法变为 True,通过 clear() 方法重新设置为 False。wait() 方法会阻塞,直到标志变为 True。

比如下面使用 Event 通知,模拟交通信号灯周期性变化及车辆通行之间的协同运行。车辆根据信号灯的状态判断是否通行还是等待;车辆通行完毕以后,只剩下信号灯周期性变化。

from threading import * import time def signal_state(): while True: time.sleep(5) print("Traffic Police Giving GREEN Signal") event.set() time.sleep(10) print("Traffic Police Giving RED Signal") event.clear() def traffic_flow(): num = 0 while num < 10: print("Waiting for GREEN Signal") event.wait() print("GREEN Signal ... Traffic can move") while event.is_set(): num = num + 1 print("Vehicle No:", num, " Crossing the Signal") time.sleep(2) print("RED Signal ... Traffic has to wait") event = Event() t1 = Thread(target=signal_state) t2 = Thread(target=traffic_flow) t1.start() t2.start()
Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 1 Crossing the Signal Vehicle No: 2 Crossing the Signal Vehicle No: 3 Crossing the Signal Vehicle No: 4 Crossing the Signal Vehicle No: 5 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Waiting for GREEN Signal Traffic Police Giving GREEN Signal GREEN Signal ... Traffic can move Vehicle No: 6 Crossing the Signal Vehicle No: 7 Crossing the Signal Vehicle No: 8 Crossing the Signal Vehicle No: 9 Crossing the Signal Vehicle No: 10 Crossing the Signal Traffic Police Giving RED Signal RED Signal ... Traffic has to wait Traffic Police Giving GREEN Signal Traffic Police Giving RED Signal Traffic Police Giving GREEN Signal Traffic Police Giving RED Signal ...

Condition条件对象

线程模块中的 Condition 类实现了条件变量对象。条件对象会强制一个或多个线程等待,直到被另一个线程通知。Condition 用于更复杂的线程同步,允许线程等待特定条件。比如上面的 Event 的实现,其内部也是在使用 Condition。

import threading import time # 共享资源 buffer = [] MAX_ITEMS = 5 condition = threading.Condition() def producer(): """生产者""" for i in range(10): time.sleep(0.2) with condition: while len(buffer) >= MAX_ITEMS: print("Buffer full,wait...") condition.wait() # 等待缓冲区有空位 item = f"item-{i}" buffer.append(item) print(f"Producer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知消费者 def consumer(): """消费者""" for i in range(10): time.sleep(0.8) with condition: while len(buffer) == 0: print("Buffer empty,wait...") condition.wait() # 等待缓冲区有数据 item = buffer.pop(0) print(f"Consumer: {item}, Buffer: {len(buffer)}") condition.notify_all() # 通知生产者 # 创建线程 prod = threading.Thread(target=producer) cons = threading.Thread(target=consumer) prod.start() cons.start() prod.join() cons.join()
Producer: item-0, Buffer: 1 Producer: item-1, Buffer: 2 Producer: item-2, Buffer: 3 Consumer: item-0, Buffer: 2 Producer: item-3, Buffer: 3 Producer: item-4, Buffer: 4 Producer: item-5, Buffer: 5 Buffer full,wait... Consumer: item-1, Buffer: 4 Producer: item-6, Buffer: 5 Buffer full,wait... Consumer: item-2, Buffer: 4 Producer: item-7, Buffer: 5 Buffer full,wait... Consumer: item-3, Buffer: 4 Producer: item-8, Buffer: 5 Buffer full,wait... Consumer: item-4, Buffer: 4 Producer: item-9, Buffer: 5 Consumer: item-5, Buffer: 4 Consumer: item-6, Buffer: 3 Consumer: item-7, Buffer: 2 Consumer: item-8, Buffer: 1 Consumer: item-9, Buffer: 0

Semaphore信号量

Semaphore 信号量控制对共享资源的访问数量。信号量的基本概念是使用一个内部计数器,每个 acquire() 调用将其递减,每个 release() 调用将其递增。计数器永远不能低于零;当 acquire() 发现计数器为零时,它会阻塞,直到某个其他线程调用 release()。当然,从源码看,信号量也是通过 Condition 条件对象来进行实现的。

import threading import time # 信号量,限制最多3个线程同时访问 semaphore = threading.Semaphore(3) def access_resource(thread_id): """访问共享资源""" with semaphore: print(f"Thread {thread_id} acquire\n", end="") time.sleep(2) # 模拟资源访问 print(f"Thread {thread_id} release\n", end="") # 创建10个线程 threads = [] for i in range(10): t = threading.Thread(target=access_resource, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
Thread 0 acquire Thread 1 acquire Thread 2 acquire Thread 0 release Thread 3 acquire Thread 1 release Thread 2 release Thread 4 acquire Thread 5 acquire Thread 3 release Thread 6 acquire Thread 4 release Thread 7 acquire Thread 5 release Thread 8 acquire Thread 6 release Thread 9 acquire Thread 7 release Thread 8 release Thread 9 release

Barrier屏障

Barrier 使多个线程等待,直到指定数目的线程都到达某个点,这些线程才会被同时唤醒,然后继续往后执行(需要注意的是:如果没有设置 timeout,且总的线程数无法整除给定的线程数 parties 时,会导致线程阻塞,形成死锁)。

import threading import time # 创建屏障,等待3个线程(注意:如果总的线程数无法整除3,则会导致线程阻塞) barrier = threading.Barrier(3) def worker(worker_id): """工作线程""" print(f"Worker {worker_id} start") time.sleep(worker_id) # 模拟不同工作速度 print(f"Worker {worker_id} arrive") barrier.wait() # 等待所有线程到达 print(f"Worker {worker_id} continue") # 创建3个线程 threads = [] for i in range(6): t = threading.Thread(target=worker, args=(i,)) threads.append(t) t.start() for t in threads: t.join()
Worker 0 start Worker 0 arrive Worker 1 start Worker 2 start Worker 3 start Worker 4 start Worker 5 start Worker 1 arrive Worker 2 arrive Worker 2 continue Worker 0 continue Worker 1 continue Worker 3 arrive Worker 4 arrive Worker 5 arrive Worker 5 continue Worker 3 continue Worker 4 continue

不管以什么样的方式进行线程通信,最重要的当属线程安全,线程通信的各种设计,也是建立在通过锁的机制保证线程安全的情况下来实现各种功能的。

版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/4/16 10:49:01

基于Wireshark的ModbusTCP报文解析深度剖析

从抓包到故障排查&#xff1a;手把手教你用Wireshark玩转ModbusTCP报文解析你有没有遇到过这样的场景&#xff1f;SCADA系统突然收不到PLC的数据&#xff0c;现场设备却显示一切正常&#xff1b;或者上位机读取寄存器总是返回异常码&#xff0c;但地址明明“没错”&#xff1b;…

作者头像 李华
网站建设 2026/4/14 6:21:27

Multisim主数据库无法读取?快速理解Win10/11解决方案

Multisim主数据库打不开&#xff1f;别慌&#xff0c;一文搞懂Win10/11下的根源与实战修复你有没有遇到过这样的场景&#xff1a;刚打开Multisim准备画个简单的放大电路&#xff0c;结果弹出一个红色警告——“multisim找不到主数据库”。元器件库一片空白&#xff0c;搜索框失…

作者头像 李华
网站建设 2026/4/16 13:00:27

天辛大师谈人工智能,AI训练师们正在觉醒自己是牛马饲料

在数字时代的洪流中&#xff0c;一群被称为“AI训练师”的从业者&#xff0c;正经历着一场深刻的自我意识觉醒。他们曾以为自己是驾驭未来智能浪潮的舵手&#xff0c;是赋予冰冷机器灵魂的魔法师&#xff0c;在数据的海洋中辛勤耕耘&#xff0c;为人工智能的进化提供着源源不断…

作者头像 李华
网站建设 2026/4/15 21:57:35

完整示例:照明设计中LED灯珠品牌选型过程

照明设计实战&#xff1a;如何为商超筒灯精准选型LED灯珠&#xff1f; 你有没有遇到过这样的情况&#xff1f; 项目时间紧&#xff0c;老板催着出样机&#xff0c;你在BOM表里翻来覆去对比几家LED厂商的数据手册——光效差那么几lm/W&#xff0c;显色指数卡在90边缘&#xff0…

作者头像 李华
网站建设 2026/3/14 13:26:27

HBuilderX运行不了浏览器?新手必看的常见问题解析

HBuilderX跑不起浏览器&#xff1f;别慌&#xff0c;这份实战排错指南帮你一招搞定你是不是也遇到过这种情况&#xff1a;刚写完一段 HTML 代码&#xff0c;满怀期待地按下CtrlR&#xff0c;结果——什么都没发生&#xff1f;或者弹出个错误提示&#xff1a;“无法启动浏览器”…

作者头像 李华