想要系统掌握 Python 多进程模块multiprocessing.Pool中apply_async、map_async、imap这三个核心方法的使用语法、核心差异、适用场景和实战示例,从而根据实际需求选择合适的多进程并发方式,提升程序执行效率。
前置基础
在讲解这三个方法前,先明确核心前提:
multiprocessing.Pool(进程池)用于管理一组子进程,避免频繁创建 / 销毁进程的开销;- 「同步」方法(如
apply、map)会阻塞主进程,直到子进程执行完成;「异步」方法(如apply_async、map_async)不会阻塞,可继续执行主进程代码; - Windows 系统中,多进程代码必须放在
if __name__ == "__main__":块中(避免递归创建子进程),Linux/macOS 无强制要求,但建议统一写法; - 进程间数据隔离:子进程无法修改主进程的变量,结果需通过返回值获取。
一、apply_async:异步执行单个任务
1. 核心作用
异步提交单个任务到进程池,主进程不阻塞,可继续执行其他逻辑;支持传参、回调函数(任务完成后自动调用)。
2. 语法
python
pool.apply_async(func, args=(), kwds={}, callback=None, error_callback=None)| 参数 | 说明 |
|---|---|
func | 子进程要执行的函数 |
args | 传给func的位置参数(元组) |
kwds | 传给func的关键字参数(字典) |
callback | 可选,任务成功执行后调用的回调函数(参数为func的返回值) |
error_callback | 可选,任务执行出错时调用的回调函数(参数为异常对象) |
3. 实战示例
python
import multiprocessing import time # 子进程执行的函数 def task(num): time.sleep(1) # 模拟耗时操作 return f"任务{num}完成,进程ID:{multiprocessing.current_process().pid}" # 回调函数(任务成功后执行) def success_callback(result): print(f"回调函数:{result}") # 错误回调函数 def error_callback(e): print(f"任务出错:{str(e)}") if __name__ == "__main__": # 创建进程池(指定4个工作进程) pool = multiprocessing.Pool(processes=4) # 异步提交单个任务 result_obj1 = pool.apply_async(task, args=(1,), callback=success_callback) result_obj2 = pool.apply_async(task, args=(2,), callback=success_callback) # 提交一个会出错的任务(测试错误回调) result_obj3 = pool.apply_async(task, args=("错误参数",), error_callback=error_callback) # 主进程继续执行(不阻塞) print("主进程:所有任务已提交,等待子进程执行...") # 必须关闭进程池 + 等待所有子进程完成(否则主进程退出,子进程会被强制终止) pool.close() pool.join() # 也可通过result_obj.get()获取单个任务的返回值(会阻塞直到任务完成) # print(result_obj1.get())输出(顺序可能因进程调度略有不同)
plaintext
主进程:所有任务已提交,等待子进程执行... 回调函数:任务1完成,进程ID:12345 回调函数:任务2完成,进程ID:12346 任务出错:invalid literal for int() with base 10: '错误参数'4. 适用场景
- 需要逐个提交任务,且每个任务参数 / 逻辑可能不同;
- 需为单个任务设置成功 / 失败回调;
- 任务数量少,或任务提交时机不固定(如动态生成任务)。
二、map_async:异步批量执行可迭代任务
1. 核心作用
异步提交一组可迭代任务(如列表、元组)到进程池,功能等价于map()的异步版本;所有任务的参数来自可迭代对象,结果按输入顺序返回,支持回调函数。
2. 语法
python
pool.map_async(func, iterable, chunksize=None, callback=None, error_callback=None)| 参数 | 说明 |
|---|---|
func | 子进程要执行的函数(仅接收单个参数,由iterable的元素传入) |
iterable | 可迭代对象(如列表),每个元素作为func的参数 |
chunksize | 可选,分块大小(大数据集时设置可提升效率,默认自动分块) |
3. 实战示例
python
import multiprocessing import time def task(num): time.sleep(1) return f"任务{num}完成,进程ID:{multiprocessing.current_process().pid}" def batch_callback(results): # 回调函数的参数是所有任务的结果列表(按输入顺序) print("批量回调:所有任务完成,结果如下:") for res in results: print(res) if __name__ == "__main__": pool = multiprocessing.Pool(processes=4) # 异步批量提交任务(iterable为列表[1,2,3,4]) result_obj = pool.map_async(task, [1,2,3,4], callback=batch_callback) print("主进程:批量任务已提交,等待执行...") pool.close() pool.join() # 也可通过result_obj.get()获取结果列表(按输入顺序) # print(result_obj.get())输出
plaintext
主进程:批量任务已提交,等待执行... 批量回调:所有任务完成,结果如下: 任务1完成,进程ID:12345 任务2完成,进程ID:12346 任务3完成,进程ID:12347 任务4完成,进程ID:123484. 适用场景
- 批量执行相同逻辑的任务,参数来自可迭代对象;
- 需异步执行且关注结果顺序;
- 任务数量适中,可一次性加载到内存。