Python 中的进程、线程、协程
文章目录
1. 进程
进程是正在运行的程序实例,是内核分配资源的最基本的单元。进程拥有自己独立的堆和栈,独立的地址空间,资源句柄。进程由 OS 调度,调度开销较大,在并发的切换过程效率较低。
Python 提供了一个跨平台的多进程模块 multiprocessing,模块中使用 Process 类来代表一个进程对象。
1.1 多进程示例
import os
from multiprocessing import Process
# 子进程执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',)) # target 指定要执行的函数,args 指定参数
print('Child process will start.')
p.start() #启动 Process 实例
p.join() #等待子进程结束后,继续往下执行
print('Child process end.')
Parent process 274.
Child process will start.
Run child process test (298)...
Child process end.
1.2 进程池示例
import os, time
from multiprocessing import Pool
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(2) # 创建对象池,并设置进程池大小,默认大小是 CPU 核数
for i in range(5):
p.apply_async(long_time_task, args=(i,)) # 设置每个进程要执行的函数和参数,异步执行
print('Waiting for all subprocesses done...')
p.close() # 关闭进程池,不允许继续添加新的 Process
p.join() # 等待全部子进程执行完毕
print('All subprocesses done.')
Parent process 274.
Run task 1 (431)...
Run task 0 (430)...
Waiting for all subprocesses done...
Task 1 runs 3.00 seconds.
Run task 2 (431)...
Task 0 runs 3.00 seconds.
Run task 3 (430)...
Task 2 runs 3.00 seconds.
Task 3 runs 3.00 seconds.
Run task 4 (431)...
Task 4 runs 3.00 seconds.
All subprocesses done.
1.3 进程间通信
multiprocessing 模块封装了底层的通信机制,提供了 Queue、Pipes 等多种方式来交换数据。以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据。
import os, time, random
from multiprocessing import Process, Queue
def write(q): # 写数据进程执行的代码
print("Process to write: %s" % os.getpid())
for value in ['A', 'B', 'C']:
print("Put %s to queue..." % value)
q.put(value)
time.sleep(random.random())
def read(q): # 读数据进程执行的代码
print("Process to read: %s" % os.getpid())
while True:
value = q.get(True)
print("Get %s from queue." % value)
if __name__ == '__main__':
q = Queue() # 父进程创建Queue,并传给各个子进程
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
pw.start() # 启动子进程pw,写入
pr.start() # 启动子进程pr,读取
pw.join() # 等待pw结束
pr.terminate() # pr进程里的死循环,无法等待结束,只能强制终止
Process to write: 211
Put A to queue...
Process to read: 212
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
2. 线程
线程是一种轻量进程,是 CPU 调度和分派的基本单元。线程并不产生新的地址空间和资源描述符表,而是复用父进程的。线程只拥有程序计数器、一组寄存器和栈,同一进程的线程共享其他全部资源。
线程由 OS 调度,相较于进程,线程调度的成本非常小。线程间通信主要通过共享内存,上下文切换很快,资源开销较少,但相比进程不够稳定容易丢失数据。
2.1 解释器
在谈 Python 的线程之前,先了解下 Python 的几个解释器版本:
- CPython ,Python 的官方版本,使用 C 语言实现,使用最为广泛,大部分人使用的都是这个版本。
- Jython,Python 的 Java 实现,相比于 CPython,与 Java 语言之间的互操作性要远远高于 CPython 和 C 语言之间的互操作性。
- Python for .NET,CPython 实现的 .NET 托管版本,与 .NET 库和程序代码有很好的互操作性。
- IronPython,不同于 Python for .NET,它是 Python 的 C# 实现,并且它将 Python 代码编译成 C# 中间代码(与 Jython 类似),与.NET语言的互操作性也非常好。
- PyPy,Python 的 Python 实现版本。PyPy 运行在 CPython(或者其它实现)之上,用户程序运行在 PyPy 之上。目标是成为 Python 语言自身的试验场,可以很容易地修改 PyPy 解释器的实现(因为是使用Python写的)。
- Stackless,Stackless Python 是 CPython 的一个增强版本,它使程序员从基于线程的编程方式中获得好处,并避免传统线程所带来的性能与复杂度问题。
2.2 全局锁 GIL
GIL 是 CPython 中特有的全局解释器锁(其它 Python 版本解释器,有自己的线程调度机制,没有GIL机制)。本质上,GIL 就是 Python 进程中的一把超大锁,在解释器进程中是全局有效。GIL 主要锁定的是 CPU 执行资源,实现线程独占。
在 CPython 解释器中,当一个线程需要使用 CPU 资源时,首先得获取 GIL,直到遇到 I/O 操作时,才会释放 GIL。
如果是 I/O 密集型线程,多线程能比单线程显著提高性能;如果是 CPU 密集型线程,多线程并不能提高性能,因为等待 GIL,多线程也只能依次按顺序执行。
在单核 CPU 中,同一时刻仅有一个线程占用 CPU,GIL 不会对 CPU 的使用率产生影响。但是在多核 CPU 中,由于 GIL 的存在,同一时刻,不同核的线程会竞争 GIL。获取到 GIL 的线程能够占用 CPU,而其他线程将处于闲置状态,即使这些线程有空闲的 CPU 资源。
在 Python 3 中 GIL 也没有去掉,因为有大量的第三方库依赖 GIL。去掉 GIL 之后,需要引入复杂的锁机制保护众多全局状态。
2.3 多线程示例
Python 的标准库提供了两个模块:thread 和 threading,thread 是低级模块,threading 是高级模块,对 thread 进行了封装。
import time, os, threading
start = time.time()
def doubler(number):
print(threading.currentThread().getName())
print('Parent process %s.' % os.getpid())
print(number * 2)
time.sleep(2)# 或者 IO 请求
print('thread run %0.2f s end'% (time.time() - start))
if __name__ == '__main__':
for i in range(3):
my_thread = threading.Thread(target=doubler, args=(i,))
my_thread.start()
#my_thread.join()
Thread-98
Parent process 426.
0
Thread-99
Parent process 426.
2
Thread-100
Parent process 426.
4
thread run 2.00 s end
thread run 2.01 s end
thread run 2.01 s end
由于线程中执行了 sleep ,释放了 CPU 资源,其他线程得以执行。如果新增注释部分的代码 my_thread.join()
, 那么线程将串行执行:
Thread-101
Parent process 426.
0
thread run 2.01 s end
Thread-102
Parent process 426.
2
thread run 4.01 s end
Thread-103
Parent process 426.
4
thread run 6.02 s end
2.4 multiprocessing.dummy
multiprocessing.dummy 模块与 multiprocessing 模块的区别: dummy 模块是多线程,而 multiprocessing 是多进程, 调用方式相同。
from multiprocessing import Pool
from multiprocessing.dummy import Pool
与 multiprocessing 类似,dummy 模块提供了多线程池,可以很方便将代码在多线程和多进程之间切换。dummy 模块在大量的开源项目中有所应用,十分推荐使用。
3. 协程
协程是一种轻量级的线程。协程拥有独立的寄存器上下文和栈,同一个线程,共享堆。协程不由 OS 调度,OS 对于协程的一无所知,完全由程序员编码进行控制。
具体点就是,执行函数 A 时,可以随时中断,去执行函数 B,接着中断 B ,继续执行函数A。而这些切换完全由程序吱声控制。协程调度实际上是在同一线程中,进行程序函数的切换,没有切换线程带来的开销。
协程比较适合处理 IO 密集型的任务。
3.1 Gevent
Gevent 是第三方库,通过 Greenlet 实现协程,其基本实现原理是:
当一个 Greenlet 遇到 IO 操作时,比如访问网络,就自动切换到其他的 Greenlet,等到 IO 操作完成,再在适当的时候切换回来继续执行。由于 IO 操作非常耗时,经常使程序处于等待状态,有了 Gevent 为我们自动切换协程,就保证总有Greenlet 在运行,而不是等待 IO。
import gevent
import time, os, threading
from gevent import monkey;
monkey.patch_all() # 将默认阻塞的模块替换成非阻塞
start = time.time()
def doubler(number):
print('Parent process %s.' % os.getpid())
print(number * 2)
time.sleep(2)
print('run %0.2f s end'% (time.time() - start))
if __name__ == '__main__':
tasks=[gevent.spawn(doubler,i) for i in range(3)] # gevent.spawn 启动协程,参数为函数名称和参数名称
gevent.joinall(tasks) # gevent.joinall 等待执行完毕
Parent process 871.
0
Parent process 871.
2
Parent process 871.
4
run 2.00 s end
run 2.00 s end
run 2.00 s end
从结果来看,Python 中多线程和多协程的效果类似,在当前执行阻塞时,切换执行流程。不同的是,多线程切换的是线程,而协程切换的是正在执行的函数上下文。
使用 Gevent,可以获得极高的并发性能,但 Gevent 只能在 Unix/Linux下运行,在 Windows 下不保证正常安装和运行。
3.2 Django
在 Django 中也会使用 Gevent 来增强并发能力,特别是对于 IO 密集型的请求较多时:
# 使用 uwsgi 部署
$ uwsgi --gevent 100 --gevent-monkey-patch --http :8000 -M --processes 4 --wsgi-file wsgi.py
# 使用 gunicorn 部署
$ gunicorn --worker-class=gevent wsgi:application -b 0.0.0.0:8000
3.3 Celery
Celery 支持几种并发模式,有 prefork,threading,协程(gevent,eventlet)。在 Celery 中使用并发模式,能显著提高处理效率,特别是 IO 操作较多时。
$ celery worker -A celery_worker.celery -P gevent -c 10 -l INFO
-P 选项指定 pool,默认是 prefork,这里指定为 gevent, -c 设置并发数。
4. 最佳实践
- IO 密集型的任务(例如,网络调用等)中使用线程和协程
- CPU 密集的任务,需要使用多个进程,绕开 GIL 限制,充分利用多核 CPU ,提高效率
- 为了充分利用 CPU ,可以结合多进程+协程进行部署,多个进程,每个进程中多个协程。