线程
线程概念
我们在日常开发中经常会听到使用多线程/多进程的方式完成并发任务。那么什么是进程?什么是线程?进程与线程之间有什么关系?接下来我们通过日常场景简单的了解一下进程与线程。
一个工厂,至少有一个车间,一个车间中最少有一个工人,最终是工人在工作。
一个程序,至少有一个进程,一个进程中最少有一个线程,最终是线程在工作。
在一个车间中有最基本的工作工具,人通过操作工具的方式来完成工作。
一个进程中有最基本的运行代码的资源(内存、cpu等等),线程通过进程中提供的资源来运行代码。
通过以上描述,我们来总结一下线程与进程的关系:
- 线程是计算机可以被cpu调度的最小单元
- 进程是计算机分配资源的的最小单元,进程可以为线程提供运行资源。
一个进程中可以有多个线程,同一个进程中的线程可以共享当前进程中的资源。
使用线程完成并发任务
我们之前编写的代码都被称为同步单线程代码,排队依次运行。如果前一个任务没有完成,那么之后的任务也无法继续。
import time def work_1(): print('任务1...') time.sleep(2) def work_2(): print('任务2...') time.sleep(2) work_1() work_2()使用线程的方式优化代码:
import time import threading def work_1(): print('任务1...') time.sleep(2) def work_2(): print('任务2...') time.sleep(2) # 通过Thread方法创建线程对象,并使用target指定线程对象要运行的任务 t1 = threading.Thread(target=work_1) t2 = threading.Thread(target=work_2) # 运行线程 t1.start() t2.start()实现一个简单的单线程同步爬虫:
import requests def get_image(url): response = requests.get(url).content file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response) print('下载完成...') url_list = [ 'http://pic.bizhi360.com/bbpic/98/10798.jpg', 'http://pic.bizhi360.com/bbpic/92/10792.jpg', 'http://pic.bizhi360.com/bbpic/86/10386.jpg' ] for url in url_list: get_image(url)对上述单线程同步爬虫进行优化:
import requests import threading def get_image(url): response = requests.get(url).content file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response) print('下载完成...') url_list = [ 'http://pic.bizhi360.com/bbpic/98/10798.jpg', 'http://pic.bizhi360.com/bbpic/92/10792.jpg', 'http://pic.bizhi360.com/bbpic/86/10386.jpg' ] for url in url_list: t = threading.Thread(target=get_image, args=(url,)) t.start()除了上述使用线程的方式提升运行速度之外,我们还可以使用多进程的方式完成并发任务。
import requests import multiprocessing def get_image(url): response = requests.get(url).content file_name = url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response) print('下载完成...') url_list = [ 'http://pic.bizhi360.com/bbpic/98/10798.jpg', 'http://pic.bizhi360.com/bbpic/92/10792.jpg', 'http://pic.bizhi360.com/bbpic/86/10386.jpg' ] if __name__ == "__main__": for url in url_list: p = multiprocessing.Process(target=get_image, args=(url,)) p.start()GIL锁
在刚刚的一些案例中我们发现无论使用多线程还是多进程的方式都可以实现并发任务,那么在什么业务场景下去使用多线程/多进程?在学习本小结内容之前,先给出结论:
- 如果任务是属于IO密集型任务那么优先使用线程方式
- 如果任务是属于计算密集型任务那么优先使用进程方式
在Python中存在全局解释器锁,简称为GIL。GIL是cpython解释器独有的。主要的功能是:让一个进程在同一时刻只能有一个线程被执行。
例如在一个进程中创建了多个线程,在运行当前程序时在同一时刻只有一个线程被执行,其他线程等待cpu调度。这种情况无法利用多核cpu的优势,如果想要绕开GIL,那么可以使用多进程的方式,创建多个进程,每个进程中只有一个线程。但是请注意,创建多进程消耗的资源比多线程消耗的资源大。
线程方法 - 重点
在正式学习线程方法之前,我们首先回顾一下刚刚学习到的内容。
# threading_test.py import time import threading def work(): time.sleep(2) print('子线程任务...') # 当前只是创建了线程对象,并不会执行线程 t = threading.Thread(target=work) # 通过start运行线程 t.start() print('主线程打印信息...') ''' 在上述代码中,我们要明确: 1.一个py文件被解释器执行时会在操作系统中创建进程。 2.在进程中创建线程来执行当前文件中的代码,我们把最初创建的线程称之为主线程。 3.当主线程执行到Thread代码时会创建一个新的线程,我们一般称之为子线程。 4.当前代码中的主线程与子线程交替执行。 5.子线程被执行时主线程不会等待,继续往下执行到没有代码时等待子线程执行完毕再退出。 '''thread_obj.start()
当前线程准备就绪,等待cpu调度,具体调度时间由cpu决定。
import threading num = 0 def add(): global num for i in range(100000): num += i t = threading.Thread(target=add) t.start() print(num)thread_obj.join()
等待子线程任务执行完毕后主线程再继续向下执行
import threading num = 0 def add(): global num for i in range(100000): num += i t = threading.Thread(target=add) t.start() t.join() # 主线程等待子线程任务结束时解堵塞 print(num) import threading num = 0 def add_(): global num for i in range(100000): num += i def sub_(): global num for i in range(100000): num -= i t1 = threading.Thread(target=add_) t2 = threading.Thread(target=sub_) t1.start() t1.join() # 主线程等待子线程任务结束时解堵塞 t2.start() t2.join() print(num)thread_obj.setDaemon(bool: attr)
设置守护线程,需要在线程启动之前设置。
如果一个线程为守护线程则主线程执行完毕后不管子线程任务是否结束都自动退出。
当前参数默认为:False
import time import threading def work(): for i in range(5): print(i) time.sleep(1) t = threading.Thread(target=work) t.setDaemon(True) t.start() print('主线程即将退出...')thread_obj.current_thread()
获取当前运行的线程对象的引用
import threading def work(): name = threading.current_thread().getName() print(name) for i in range(5): t = threading.Thread(target=work) t.setName(f'线程: {i}') t.start()自定义线程类完成线程爬虫
import requests import threading class ThreadSpider(threading.Thread): def __init__(self, url): super().__init__() # 子类继承线程类如果需要重写构造函数必须先调用父类的构造函数 self.url = url def run(self): response = requests.get(self.url).content file_name = self.url.rsplit('/')[-1] with open(file_name, 'wb') as f: f.write(response) print('下载完成...') url_list = [ 'http://pic.bizhi360.com/bbpic/98/10798.jpg', 'http://pic.bizhi360.com/bbpic/92/10792.jpg', 'http://pic.bizhi360.com/bbpic/86/10386.jpg' ] for url in url_list: ts = ThreadSpider(url) ts.start()线程安全
在之前的案例中我们使用了多个线程对一个全局变量进行修改的操作,如果多个线程都对一个全局变量进行操作的话会出现资源竞争的问题,会导致计算错误。
import threading num = 0 def add_(): global num for i in range(100000): num += i def sub_(): global num for i in range(100000): num -= i t1 = threading.Thread(target=add_) t2 = threading.Thread(target=sub_) t1.start() t2.start() t1.join() t2.join() print(num)使用线程锁解决上述问题:
from threading import Thread, RLock num = 0 lock_obj = RLock() def add_(): global num for i in range(100000): lock_obj.acquire() # 申请锁,申请成功会让其他线程等待直到当前线程释放 num += i lock_obj.release() # 释放锁,当锁被释放后其他线程才能被cpu挂起执行 def sub_(): global num for i in range(100000): lock_obj.acquire() num -= i lock_obj.release() t1 = Thread(target=add_) t2 = Thread(target=sub_) t1.start() t2.start() t1.join() t2.join() print(num)在上述案例中,我们在代码中手动申请锁与释放锁,RLock方法支持上下文管理协议,可以使用with语句帮助我们申请和释放锁。
from threading import Thread, RLock num = 0 lock_obj = RLock() def add_(): global num for i in range(100000): with lock_obj: # 使用上下文管理锁的申请与释放 num += i def sub_(): global num for i in range(100000): with lock_obj: num -= i t1 = Thread(target=add_) t2 = Thread(target=sub_) t1.start() t2.start() t1.join() t2.join() print(num)在线程中一般使用两种锁机制:Lock、RLock
Lock
同步锁:同步锁一般很少使用,不支持锁嵌套
from threading import Thread, Lock num = 0 lock_obj = Lock() def add_num(): global num for i in range(10000000): lock_obj.acquire() num += 1 lock_obj.release() print(num) for _ in range(2): t = Thread(target=add_num) t.start()RLock
递归锁:支持锁嵌套
from threading import Thread, RLock num = 0 lock_obj = RLock() def add_num(): global num for i in range(100000): lock_obj.acquire() lock_obj.acquire() num += 1 lock_obj.release() lock_obj.release() print(num) for _ in range(2): t = Thread(target=add_num) t.start()死锁
在使用锁的过程中我们发现如果在同步锁中使用了嵌套则程序会卡死,这种情况我们称之为死锁。
死锁:由于资源竞争造成的一种堵塞现象。
# coding=utf-8 import threading import time mutexA = threading.Lock() mutexB = threading.Lock() class MyThread1(threading.Thread): def run(self): # 对mutexA上锁 mutexA.acquire() # mutexA上锁后,延时1秒,等待另外那个线程 把mutexB上锁 print(self.name + '----do1---up----') time.sleep(1) # 此时会堵塞,因为这个mutexB已经被另外的线程抢先上锁了 mutexB.acquire() print(self.name + '----do1---down----') mutexB.release() # 对mutexA解锁 mutexA.release() class MyThread2(threading.Thread): def run(self): # 对mutexB上锁 mutexB.acquire() # mutexB上锁后,延时1秒,等待另外那个线程 把mutexA上锁 print(self.name + '----do2---up----') time.sleep(1) # 此时会堵塞,因为这个mutexA已经被另外的线程抢先上锁了 mutexA.acquire() print(self.name + '----do2---down----') mutexA.release() # 对mutexB解锁 mutexB.release() if __name__ == '__main__': t1 = MyThread1() t2 = MyThread2() t1.start() t2.start()队列
概述
Python的Queue模块提供了同步的、线程安全的队列类,包括:
- FIFO先进先出队列:Queue
- LIFO后进先出队列:LifoQueue
- 优先级队列:PriorityQueue
这些队列用于多线程编程,可以用来安全传输任何Python对象。当信息必须在多个线程间安全交换时,Queue模块就显得特别有用。
队列方法
方法名称 方法作用
Queue(maxsize=0) 创建一个先进先出的队列。maxsize指定队列中能存储的项目数的上限。若maxsize小于或等于0,队列大小为无限。
put(item, block=True, timeout=None) 将item放入队列中
- block:如果为True且队列已满,将阻塞至队列有空间;如果为False,队列满时将引发queue.Full异常。
- timeout:阻塞的最大时间(以秒为单位)。只有当block=True时才有效。若在指定时间内未能放入队列,将引发queue.Full异常。
get(block=True, timeout=None) 从队列中移除并返回一个项目
- block:如果为True且队列已满,将阻塞至队列有空间;如果为False,队列满时将引发queue.Full异常。
- timeout:阻塞的最大时间(以秒为单位)。只有当block=True时才有效。若在指定时间内队列仍为空,将引发queue.Empty异常。
qsize() 返回队列中大致的项目数。由于多线程的存在,当qsize()返回时,返回值可能不再准确。
empty() 判断队列是否为空。如果队列为空,返回True;反之,返回False。
full() 判断队列是否已满。如果队列已满,返回True;反之,返回False。
task_done() 表示之前入队的任务已经完成。每个get()调用得到的项目后,随后都应该调用task_done(),用于队列完成任务的追踪。
join() 阻塞调用者线程,直至队列中的所有项目都被处理(即每个由put()调用加入队列的项目都有一个相应的task_done()调用)。
代码示例
以下是一个利用Queue模块实现基本的生产者-消费者模型的示例:
import time import threading from queue import Queue def producer(queue): for i in range(1, 6): item = f'项{i}' queue.put(item) print(f'生产:{item}') time.sleep(1) def consumer(queue): while True: item = queue.get() print(f'消费:{item}') queue.task_done() time.sleep(1.5) if __name__ == '__main__': q = Queue() t1 = threading.Thread(target=producer, args=(q,)) t2 = threading.Thread(target=consumer, args=(q,)) t2.daemon = True # 线程2是无限循环需要设置守护线程以便主线程退出 t1.start() t2.start() t1.join() q.join() # 等待所有项被消费生产者与消费者
概述
生产者消费者模式是一种经典的同步机制,用于解决多线程间的协作问题。这种模式主要涉及两类线程:生产者(Producer)和消费者(Consumer)。生产者负责生成数据,放入一个共享的数据缓冲区中,而消费者则从缓冲区中取出数据进行处理。该模式可以通过Queue模块在Python中实现,其中队列(Queue)即作为那个共享的数据缓冲区。
为什么使用生产者消费者模式
使用生产者消费者模式的主要原因之一是为了解耦生产数据的过程与消费数据的过程。这意味着生产者无需等待消费者处理数据即可继续产生新的数据,反之亦然。这种方式可以大幅提高程序的并发性能和效率,特别是在生产和消费速度不一致时。
应用场景
- 日志记录:生产者生成日志信息,消费者将日志信息写入文件或发送至日志服务器。
- 任务调度系统:生产者提交任务,消费者从任务队列中拉取并执行任务。
- 事件处理系统:生产者是各类事件(如用户点击、系统错误等)的产生源,消费者负责对这些事件进行响应和处理。
- 数据流处理:在数据密集型应用中,生产者负责提取和预处理数据,消费者进行进一步的数据分析和处理。
实现生产者消费者模式的两种方法
哨兵模式
哨兵(Sentinel)对象是一个特殊的对象,用于通知消费者线程没有更多的项目待处理,是一种停止信号
import time import queue import threading # 创建一个队列实例 q = queue.Queue() # 定义生产者函数 def producer(name): for i in range(1, 6): item = f'产品{i}' q.put(item) # 将项放入队列中 print(f'生产者 {name} 生产了 {item}') time.sleep(1) # 假设生产需要一些时间 # 定义消费者函数 def consumer(name): while True: item = q.get() # 从队列中获取项 if item is None: q.put(item) # 为了让其他消费者也能停止,重新放入哨兵对象。 break # None为停止信号 print(f'消费者 {name} 消费了 {item}') time.sleep(2) # 假设消费需要更多时间 # 创建和启动生产者线程 producer_thread = threading.Thread(target=producer, args=('小明',)) producer_thread.start() # 创建和启动消费者线程 consumer_thread = threading.Thread(target=consumer, args=('小红',)) consumer_thread.start() # 等待生产者线程完成 producer_thread.join() # 向队列中添加None,作为结束信号让消费者停止 q.put(None) # 等待消费者线程完成 consumer_thread.join() print('程序结束')task_done
使用Queue的task_done()方法来告知生产者某个项目已被处理完毕。当队列中的所有项目都被处理后,join()方法将解锁,允许程序继续执行或退出
import time import queue import threading # 创建一个队列实例 q = queue.Queue() # 定义生产者函数 def producer(name): for i in range(1, 6): item = f'产品{i}' q.put(item) # 将项放入队列中 print(f'生产者 {name} 生产了 {item}') time.sleep(1) # 假设生产需要一些时间 # 定义消费者函数 def consumer(name): while True: item = q.get() # 从队列中获取项 print(f'消费者 {name} 消费了 {item}') time.sleep(2) # 假设消费需要更多时间 q.task_done() # 指示队列中的任务已经处理完成 # 创建和启动生产者线程 producer_thread = threading.Thread(target=producer, args=('小明',)) producer_thread.start() # 创建和启动消费者线程 consumer_thread = threading.Thread(target=consumer, args=('小红',)) consumer_thread.daemon = True consumer_thread.start() q.join() # 队列任务全部完成后解堵塞 # producer_thread.join() print('程序结束...')线程池
概述
创建线程对象的期间会损耗时间,尤其是在需要开辟大量线程对象的时候会发生性能下降的情况。那么我们能否让程序创建一定数量的线程对象,并且在执行完某一个任务后不会被解释器销毁,下一个任务重复使用之前所创建的线程对象。像这种需要创建大量线程对象的场景推荐使用线程池。
线程池的创建
# 模拟网页请求 import time from concurrent.futures import ThreadPoolExecutor def get_html(time_attr): time.sleep(time_attr) print(f'get page {time_attr} success') return time_attr # 创建线程池对象 executor = ThreadPoolExecutor(max_workers=2) # 通过submit提交需要执行的函数到线程池中,并且submit是立即返回对象不会堵塞 task_1 = executor.submit(get_html, 3) task_2 = executor.submit(get_html, 2) # done方法用于判定某个任务是否完成 print('task_1完成情况:', task_1.done()) # 可以使用cancel取消任务 但是运行中的任务无法取消,可以将线程数量修改成1 # print('task_2任务取消:', task_2.cancel()) # result方法可以获取任务的返回值 当前获取为阻塞 print('task_1返回结果:', task_1.result())线程池对象内置方法
as_completed
获取已经执行成功的task的返回值
# 模拟网页请求 import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(time_attr): time.sleep(time_attr) print(f'get page {time_attr} success') return time_attr # 创建线程池对象 executor = ThreadPoolExecutor(max_workers=2) # 批量提交任务并获取已经执行成功的task的返回值 time_attr_list = [3, 2, 5, 4] all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list] for future in as_completed(all_task): data = future.result() print(f'get page {data}') # 线程任务只要执行完就能获取到返回值map
获取已经执行成功的task的返回值 - 代码更精简
# 模拟网页请求 import time from concurrent.futures import ThreadPoolExecutor, as_completed def get_html(time_attr): time.sleep(time_attr) print(f'get page {time_attr} success') return time_attr # 创建线程池对象 executor = ThreadPoolExecutor(max_workers=2) # 批量提交任务并获取已经执行成功的task的返回值 time_attr_list = [3, 2, 5, 4] # 通过executor对象中的map获取已经完成的任务的返回值 for data in executor.map(get_html, time_attr_list): print(f'get page {data}') # 当前打印的返回值顺序与列表顺序一致 # all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list] # for future in as_completed(all_task): # data = future.result() # print(f'get page {data}') # 线程任务只要执行完就能获取到返回值wait
等待指定任务完成后主线程解堵塞
import time from concurrent.futures import ThreadPoolExecutor, wait def get_html(time_attr): time.sleep(time_attr) print(f'get page {time_attr} success') return time_attr # 创建线程池对象 executor = ThreadPoolExecutor(max_workers=2) # 批量提交任务并获取已经执行成功的task的返回值 time_attr_list = [3, 2, 5, 4] all_task = [executor.submit(get_html, time_attr) for time_attr in time_attr_list] wait(all_task) print('主线程执行...')