进程(process)和线程(thread)是操作系统的基本概念,但是它们比较抽象,不容易掌握。关于多进程和多线程,教科书上最经典的一句话是'进程是资源分配的最小单位,线程是 CPU 调度的最小单位'。线程是程序中一个单一的顺序控制流程。进程内一个相对独立的、可调度的执行单元,是系统独立调度和分派 CPU 的基本单位指运行中的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。
The mechanism used by the CPython interpreter to assure that only one thread executes Python bytecode at a time. This simplifies the CPython implementation by making the object model (including critical built-in types such as dict) implicitly safe against concurrent access. Locking the entire interpreter makes it easier for the interpreter to be multi-threaded, at the expense of much of the parallelism afforded by multi-processor machines.
GIL 有什么好处?简单来说,它在单线程的情况更快,并且在和 C 库结合时更方便,而且不用考虑线程安全问题,这也是早期 Python 最常见的应用场景和优势。另外,GIL 的设计简化了 CPython 的实现,使得对象模型,包括关键的内建类型如字典,都是隐含可以并发访问的。锁住全局解释器使得比较容易的实现对多线程的支持,但也损失了多处理器主机的并行计算能力。
Python 3.2 开始使用新的 GIL。新的 GIL 实现中用一个固定的超时时间来指示当前的线程放弃全局锁。在当前线程保持这个锁,且其他线程请求这个锁时,当前线程就会在 5 毫秒后被强制释放该锁。该改进在单核的情况下,对于单个线程长期占用 GIL 的情况有所好转。
在单核 CPU 上,数百次的间隔检查才会导致一次线程切换。在多核 CPU 上,存在严重的线程颠簸(thrashing)。而每次释放 GIL 锁,线程进行锁竞争、切换线程,会消耗资源。单核下多线程,每次释放 GIL,唤醒的那个线程都能获取到 GIL 锁,所以能够无缝执行,但多核下,CPU0 释放 GIL 后,其他 CPU 上的线程都会进行竞争,但 GIL 可能会马上又被 CPU0 拿到,导致其他几个 CPU 上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸 (thrashing),导致效率更低。
另外,从上面的实现机制可以推导出,Python 的多线程对 IO 密集型代码要比 CPU 密集型代码更加友好。
针对 GIL 的应对措施:
使用更高版本 Python(对 GIL 机制进行了优化)
使用多进程替换多线程(多进程之间没有 GIL,但是进程本身的资源消耗较多)
指定 cpu 运行线程(使用 affinity 模块)
使用 Jython、IronPython 等无 GIL 解释器
全 IO 密集型任务时才使用多线程
使用协程(高效的单线程模式,也称微线程;通常与多进程配合使用)
将关键组件用 C/C++ 编写为 Python 扩展,通过 ctypes 使 Python 程序直接调用 C 语言编译的动态链接库的导出函数。(with nogil 调出 GIL 限制)
Python 的多进程包 multiprocessing
Python 的 threading 包主要运用多线程的开发,但由于 GIL 的存在,Python 中的多线程其实并不是真正的多线程,如果想要充分地使用多核 CPU 的资源,大部分情况需要使用多进程。在 Python 2.6 版本的时候引入了 multiprocessing 包,它完整的复制了一套 threading 所提供的接口方便迁移。唯一的不同就是它使用了多进程而不是多线程。每个进程有自己的独立的 GIL,因此也不会出现进程之间的 GIL 争抢。
Python 的 os 模块封装了常见的系统调用,其中就包括 fork,可以在 Python 程序中轻松创建子进程:
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
上述代码在 Linux、Unix 和 Mac 上的执行结果为:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
terminate():强制终止进程 p,不会进行任何清理操作,如果 p 创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果 p 还保存了一个锁那么也将不会被释放,进而导致死锁
is_alive():返回进程是否在运行。如果 p 仍然运行,返回 True
join([timeout]):进程同步,主进程等待子进程完成后再执行后面的代码。线程等待 p 终止(强调:是主线程处于等的状态,而 p 是处于运行的状态)。timeout 是可选的超时时间(超过这个时间,父线程不再等待子线程,继续往下执行),需要强调的是,p.join 只能 join 住 start 开启的进程,而不能 join 住 run 开启的进程
属性介绍:
daemon:默认值为 False,如果设为 True,代表 p 为后台运行的守护进程;当 p 的父进程终止时,p 也随之终止,并且设定为 True 后,p 不能创建自己的新进程;必须在 p.start() 之前设置
使用示例:(注意:在 windows 中 Process() 必须放到 if name == 'main':下)
from multiprocessing import Process
import os
defrun_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
Pool(用于创建管理进程池)
Pool 类用于需要执行的目标很多,而手动限制进程数量又太繁琐时,如果目标少且不用控制进程数量则可以用 Process 类。Pool 可以提供指定数量的进程,供用户调用,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,就重用进程池中的进程。
from multiprocessing import Process, Semaphore
import time, random
defgo_wc(sem, user):
sem.acquire()
print('%s 占到一个茅坑' % user)
time.sleep(random.randint(0, 3))
sem.release()
print(user, 'OK')
if __name__ == '__main__':
sem = Semaphore(2)
p_l = []
for i inrange(5):
p = Process(target=go_wc, args=(sem, 'user%s' % i,))
p.start()
p_l.append(p)
for i in p_l:
i.join()
import multiprocessing
import time
defstage_1(cond):
"""perform first stage of work,
then notify stage_2 to continue
"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
print('{} done and ready for stage 2'.format(name))
cond.notify_all()
defstage_2(cond):
"""wait for the condition telling us stage_1 is done"""
name = multiprocessing.current_process().name
print('Starting', name)
with cond:
cond.wait()
print('{} running'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i inrange(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
import multiprocessing
import time
defwait_for_event(e):
"""Wait for the event to be set before doing anything"""print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
defwait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
e.set()
print('main: event is set')
from concurrent import futures
deftest(num):
import time
return time.ctime(), num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test, 1)
print(future.result())
from concurrent import futures
deftest(num):
import time
return time.ctime(), num
data = [1, 2, 3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test, data):
print(future)
shutdown() 方法
释放系统资源,在 Executor.submit() 或 Executor.map() 等异步操作后调用。使用 with 语句可以避免显式调用此方法。
from concurrent.futures import ThreadPoolExecutor, wait, as_completed
from time import sleep
from random import randint
defreturn_after_5_secs(num):
sleep(randint(1, 5))
return"Return of {}".format(num)
pool = ThreadPoolExecutor(5)
futures = []
for x inrange(5):
futures.append(pool.submit(return_after_5_secs, x))
print(1)
for x in as_completed(futures):
print(x.result())
print(2)