进程
进程概述
程序:例如xxx.py这就是程序,是一个静态的。
进程:一个程序运行起来后,代码 + 用到的资源称之为进程,它是操作系统分配资源的基本单元。
不仅可以通过线程完成多任务,进程也是可以的。
进程状态
工作中,任务数往往大于CPU的核数,即一定有一些任务正在执行,而另外一些任务在等待CPU进行执行,因此导致了有了不同的状态。
- 就绪态:运行的条件都已经具备,正在等在CPU执行
- 执行态:CPU正在执行其功能
- 等待态:等待某些条件满足,例如一个程序sleep了,此时就处于等待态
进程的创建与简单使用
multiprocessing模块就是跨平台版本的多进程模块,提供了一个Process类来代表一个进程对象,这个对象可以理解为是一个独立的进程,可以执行另外的事情。
from multiprocessing import Process def work(): print('子进程任务启动...') if __name__ == "__main__": # 创建进程对象 p = Process(target=work) # 启动进程 p.start()并行执行两个循环任务
import time from multiprocessing import Process def run_proc(): while True: print("当前任务被子进程运行...") time.sleep(1) if __name__ == '__main__': p = Process(target=run_proc) p.start() while True: print("当前任务被主进程运行...") time.sleep(1)创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动
获取进程的pid
# -*- coding: utf-8 -*- import os from multiprocessing import Process def run_proc(): print('子进程运行中,pid=%d...' % os.getpid()) print('子进程即将退出...') if __name__ == '__main__': print('父进程pid: %d' % os.getpid()) p = Process(target=run_proc) p.start()进程的内置参数与内置方法
multiprocessing.Process内置参数
参数名称 参数作用
target 如果传递了函数的引用,可以任务这个子进程就执行这里的代码
args 给target指定的函数传递的参数,以元组的方式传递
kwargs 给target指定的函数传递命名参数
name 给进程设定名称,也可以不设定
group 指定进程组,大多数情况下用不到
multiprocessing.Process内置方法
方法名称 方法作用
start() 启动子进程实例(创建子进程)
is_alive() 判断子进程是否存活
join() 是否等待子进程执行结束或等待多少秒
terminate() 不管任务是否完成都立即终止子进程
给子进程指定的函数传递参数
# -*- coding: utf-8 -*- import os import time from multiprocessing import Process def run_proc(name, age, **kwargs): for i in range(10): print('子进程运行中: name=%s, age=%d, pid=%d...' % (name, age, os.getpid())) print('传递的不定长kw参数为:', kwargs) time.sleep(0.2) if __name__ == '__main__': p = Process(target=run_proc, args=('admin', 18), kwargs={"address": '长沙'}) p.start() time.sleep(1) # 主程序休眠一秒 p.terminate() # 不管任务是否完成都立即终止子进程 # 使用join方法确保子进程已经被操作系统清理, 如果没有调用join方法则子进程会成为僵尸进程 p.join() print(p.is_alive()) # 判断子进程对象是否存活进程间不共享全局变量
# -*- coding: utf-8 -*- import os import time from multiprocessing import Process nums = [11, 22] def work_1(): print("子进程-1的pid为: %d, 初始列表=%s" % (os.getpid(), nums)) for i in range(1, 4): nums.append(i) time.sleep(1) print("子进程-1的pid为: %d, 操作后的列表=%s" % (os.getpid(), nums)) def work_2(): print("子进程-2的pid为: %d, 全局列表=%s" % (os.getpid(), nums)) if __name__ == '__main__': p1 = Process(target=work_1) p1.start() p1.join() p2 = Process(target=work_2) p2.start()功能
- 进程:能够完成多任务,比如 在一台电脑上能够同时运行多个QQ
- 线程:能够完成多任务,比如 一个QQ中的多个聊天窗口
定义的不同
- 进程是系统进行资源分配和调度的一个独立单位。
- 线程是进程的一个实体。是CPU调度和分派的基本单位。它是比进程更小的能独立运行的基本单位,线程自己基本上不拥有系统资源,只拥有一点在运行中必不可少的资源(如程序计数器,一组寄存器和栈)。但是它可与同属一个进程的其他的线程共享进程所拥有的全部资源。
区别
- 一个程序至少有一个进程,一个进程至少有一个线程。
- 线程的划分尺度小于进程(资源比进程少),使得多线程程序的并发性高。
- 进程在执行过程中拥有独立的内存单元,而多个线程共享内存,从而极大地提高了程序的运行效率
- 线程不能够独立执行,必须依存在进程中
- 可以将进程理解为工厂中的一条流水线,而其中的线程就是这个流水线上的工人
优缺点
线程和进程在使用上各有优缺点:线程执行开销小,但不利于资源的管理和保护,而进程正相反。
进程通信
概述
Process之间有时需要通信,操作系统提供了很多机制来实现进程间的通信。
Queue的使用
可以使用multiprocessing模块的Queue实现多进程之间的数据传递。
Queue本身是一个消息列队程序,首先用一个实例来演示一下Queue的工作原理:
from multiprocessing import Queue queue = Queue(3) # 初始化一个Queue对象,最多可接收三条put消息 queue.put("消息1") queue.put("消息2") print(q.full()) # 判断当前队列是否已满: False queue.put("消息3") print(q.full()) # True # 如果队列已满put_nowait会立即抛出异常,put等待两秒会抛出异常 q.put("消息4", True, 2) q.put_nowait("消息4") # 推荐的方式,先判断消息列队是否已满,再写入 if not q.full(): q.put_nowait("消息4") # 读取消息时,先判断消息列队是否为空,再读取 if not q.empty(): for i in range(q.qsize()): print(q.get_nowait())进程队列对象内置方法概述
方法名称 方法作用
Queue.qsize() 返回当前队列包含的消息数量
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列已满,返回True,反之False
Queue.get() 获取队列中的一条消息,然后将其从列队中移除
Queue.get_nowait() 如果队列为空则直接抛出异常,相当于Queue.get(False)
Queue.put() 将item消息写入队列
Queue.put_nowait() 如果队列已满则直接抛出异常,相当于Queue.put(item, False)
进程队列代码示例
在父进程中创建两个子进程,一个往队列里写数据;一个从队列里读数据
import time, random from multiprocessing import Process, Queue # 写数据进程执行的代码: def write(queue): for item in ['A', 'B', 'C']: print(f'向队列上传的值为: {item}') queue.put(item) time.sleep(random.random()) # 读数据进程执行的代码: def read(queue): while True: item = queue.get() if item is not None: print(f'向队列获取的值为: {item}') time.sleep(random.random()) else: break if __name__ == '__main__': q = Queue() p1 = Process(target=write, args=(q,)) # 写入 p2 = Process(target=read, args=(q,)) # 读取 p1.start() p2.start() # 等待写入任务完成后主进程继续向下执行 p1.join() # 使用主进程向队列传递哨兵 q.put(None) # 等待读取任务完成后主进程继续向下执行 p2.join() print('主程序即将退出...')进程池
使用Pool创建进程池
当需要创建的子进程数量不多时,可以直接利用multiprocessing中的Process动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到multiprocessing模块提供的Pool方法。
初始化Pool时,可以指定一个最大进程数,当有新的请求提交到Pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
代码示例
import os, time, random from multiprocessing import Pool def worker(proc_name, *args, **kwargs): p_start = time.time() print(f"{proc_name}开始执行, 进程号为: {os.getpid()}") print('任务接收到的args不定长参数为:', args) print('任务接收到的kw不定长参数为:', kwargs) # random.random(): 随机生成0 ~ 1之间的浮点数 time.sleep(random.random() * 2) p_stop = time.time() print(f'{proc_name}执行完毕, 子进程耗时为: {p_stop - p_start:.2f}') if __name__ == '__main__': # 主程序启动计时 main_start = time.time() # 定义一个进程池,最大进程数3 pool = Pool(3) for item in range(1, 11): # apply_async(要调用的目标, (传递给目标的参数,)) # 每次循环将会用空闲出来的子进程去调用目标 pool.apply_async(worker, (item, '测试参数-1', '测试参数-2'), kwds={'proc_kw': 'proc_attr'}) # 同步执行 执行该方法会导致主进程堵塞 # po.apply(worker, (item,)) # 关闭进程池,关闭后pool不再接收新的请求 pool.close() # 等待pool中所有子进程执行完成,必须放在close语句之后 pool.join() main_stop = time.time() print(f'主程序耗时: {main_stop - main_start:.2f}')multiprocessing.Pool常用函数解析:
方法名称 方法作用
apply_async() 使用非阻塞方式调用func
close() 关闭Pool,使其不再接受新的任务
terminate() 不管任务是否完成,立即终止任务
join() 主进程阻塞并等待子进程的退出, 必须在close或terminate之后使用
进程池中的Queue
如果要使用Pool创建进程,就需要使用multiprocessing.Manager()中的Queue(),而不是multiprocessing.Queue()。否则可能会得到一条如下的错误信息:RuntimeError: Queue objects should only be shared between processes through inheritance.
以下实例演示了进程池中的进程如何通信:
import os import time from random import randint # from multiprocessing import Queue from multiprocessing import Manager, Pool def reader(queue): print(f"读取任务启动, 进程的pid为: {os.getpid()}") while True: message = queue.get() if message is not None: print(f"读取任务从队列中获取到消息为: {message}") else: break def writer(queue): print(f"写入任务启动, 进程的pid为: {os.getpid()}") url_list = [ 'https://www.baidu.com', 'https://www.google.com', 'https://www.facebook.com', 'https://www.yahoo.com', 'https://www.linkedin.com', 'https://www.github.com', 'https://www.bing.com' ] for item in url_list: queue.put(item) time.sleep(randint(1, 2)) # 模拟IO延迟 queue.put(None) # 任务退出前传递哨兵信号 if __name__ == "__main__": print("主程序启动...") # 无法使用进程中的Queue队列 # q = Queue() # 使用Manager中的Queue q = Manager().Queue() pool = Pool() pool.apply_async(writer, (q,)) pool.apply_async(reader, (q,)) pool.close() pool.join() print("主程序即将退出...")获取进程池返回值
同步/并发获取返回值
import time from random import random from multiprocessing import Pool # 返回值从进程池获取,父子进程没有返回值 def proc_func(num): # 模拟随机延迟 time.sleep(random()) # 任务函数使用return将计算结果返回 return num * num if __name__ == "__main__": pool = Pool(5) # 存放future对象 result_list = list() for item in range(1, 11): result = pool.apply_async(proc_func, args=(item,)) # 并发执行 # result = pool.apply(func, args=(i,)) 同步执行 # 将future对象添加到列表中 result_list.append(result) for item in result_list: # 使用进程池结果对象中的get方法获取结果 print(item.get())利用map()方法获取返回值
import time from random import random from multiprocessing import Pool def proc_func(num): time.sleep(random()) return num * num if __name__ == "__main__": pool = Pool(5) result = pool.map(proc_func, range(1, 11)) print(result)注意点:
- map()是一次性获取所有子进程的返回值,自带close、join
- apply_async是分批返回
使用concurrent模块创建进程池
concurrent模块创建进程池的使用方式与线程池保持一致,以下为代码示例:
import os import time from random import randint from multiprocessing import Manager from concurrent.futures import ProcessPoolExecutor def reader(queue): print(f"读取任务启动, 进程的pid为: {os.getpid()}") while True: message = queue.get() if message is not None: print(f"读取任务从队列中获取到消息为: {message}") else: print("读取任务结束。") break def writer(queue): print(f"写入任务启动, 进程的pid为: {os.getpid()}") url_list = [ 'https://www.baidu.com', 'https://www.google.com', 'https://www.facebook.com', 'https://www.yahoo.com', 'https://www.linkedin.com', 'https://www.github.com', 'https://www.bing.com' ] for item in url_list: queue.put(item) time.sleep(randint(1, 2)) # 模拟IO延迟 queue.put(None) # 任务退出前传递哨兵信号 def main(): print("主程序启动...") with Manager() as manager: q = manager.Queue() with ProcessPoolExecutor() as executor: executor.submit(writer, q) executor.submit(reader, q) print("主程序即将退出...") if __name__ == "__main__": main() 在进程中使用生产者与消费者模式和线程稍有不同,以下为代码示例:生产者与消费者
哨兵模式
import time import queue from multiprocessing import Process, Queue # 定义生产者函数 def producer(name, queue): for i in range(1, 6): item = f'产品{i}' queue.put(item) # 将项放入队列中 print(f'生产者 {name} 生产了 {item}') time.sleep(1) # 假设生产需要一些时间 # 定义消费者函数 def consumer(name, queue): while True: item = queue.get() # 从队列中获取项 if item is None: queue.put(item) # 为了让其他消费者也能停止,重新放入哨兵对象。 break # 如果为None则停止循环 print(f'消费者 {name} 消费了 {item}') time.sleep(2) # 假设消费需要更多时间 if __name__ == '__main__': # 创建队列对象 q = Queue() # 创建和启动生产者进程 producer_process = Process(target=producer, args=('小明', q)) producer_process.start() # 创建和启动消费者线程 consumer_process = Process(target=consumer, args=('小红', q)) consumer_process.start() # 等待生产者进程执行完毕 producer_process.join() # 使用主程序往队列中添加哨兵信号: None q.put(None) # 等待消费者进程完成 consumer_process.join() # 主程序退出 print('程序结束')task_done
在进程环境中的Queue不支持task_done()方法,需要替换成JoinableQueue
import time from multiprocessing import Process, JoinableQueue as Queue # 定义生产者函数 def producer(name, queue): for i in range(1, 6): item = f'产品{i}' queue.put(item) # 将项放入队列中 print(f'生产者 {name} 生产了 {item}') time.sleep(1) # 假设生产需要一些时间 # 定义消费者函数 def consumer(name, queue): while True: item = queue.get() # 从队列中获取项 print(f'消费者 {name} 消费了 {item}') time.sleep(2) # 假设消费需要更多时间 queue.task_done() # 指示队列中的任务已经处理完成 if __name__ == '__main__': q = Queue() producer_process = Process(target=producer, args=('小明', q)) producer_process.start() consumer_process = Process(target=consumer, args=('小红', q)) # 将消费者设置为守护进程后启动 consumer_process.daemon = True consumer_process.start() # 等待生产者任务完成 producer_process.join() # 等待队列任务完成, 如果队列完成则允许主程序退出 q.join() print('主程序结束...')