在 Linux/Unix 系统下,可以使用 os.fork() 创建、管理子进程,但是这种方法在 Windows 下是行不通的,而且我并没在实际开发中用过这种方式,所以这里只介绍 Multiprocessing 模块的使用方法。我会直接讲解用法,至于进程、线程的概念这里默认读者是了解的。

一、Python3 多进程编程

1. Process 类

multiprocessing 提供 Process 类来代表一个进程,通过实例化创建一个新子进程

from multiprocessing import Process, Queue

def power(x, res):
    for _ in range(4):
        x *= x
    res.put(x)

if __name__ == '__main__':
    res = Queue()
    p = Process(target=power, args=(2, res))
    p.start()
    p.join()
    result = res.get()
    print(result)

代码说明:

  • 创建进程的代码必须像示例代码中一样,写在程序入口下方,这是使用 multiprocessing 编写多进程程序的要求,无论是使用 Process 还是后面会介绍的进程池。
  • 创建进程的基本过程:一、 实例化 Process,同时传给子进程目标函数(target)和目标函数的变量(args)。注意函数名后面不要加括号,这里传的是函数的引用。 另外,注意传递给 args 的是一个 tuple,所以如果仅有一个参数,要在后面加逗号,例如: args = (2, ); 二、启动子进程(p.start());三、等待子进程结束(p.join()),这一步是用来同步进程的,后面的代码要等待该子进程结束才执行。
  • 获取目标函数的结果:使用 Process 不能用 return 返回目标函数的结果,要使用 multiprocessing 提供的数据容器,即队列 Queue 或者管道 Pipe,我推荐用 Queue,因为比较简单,就像一般的队列一样。在创建进程之前实例化一个 Queue,把它作为参数传给目标函数,在目标函数中用 Queue 的 put() 方法把结果放入队尾。当子进程结束后,用 get() 方法把结果取出即可。

那么,如果要批量创建进程呢?后面会介绍使用进程池的方法,但是这里我还是给出一个用 Process 批量创建进程的示例,注意如何批量 join 子进程:


from multiprocessing import Process, Queue

def power(x, res):
    for _ in range(4):
        x *= x
    res.put(x)

if __name__ == '__main__':
    res = Queue()
    nums = [1, 2, 3]
    processes = []
    for num in nums:
        p = Process(target=power, args=(num, res))
        p.start()
        processes.append(p)
    for name in processes:
        name.join()
    
    for i in range(len(nums)):
        print(res.get())

2. Pool 类

当我们需要批量创建进程时,如果用 Process,如上面的示例,事情就会变得很麻烦,通常是在循环内部重复创建进程,而且要循环 join 子进程。这时候使用进程池实现同样的功能就会简单优雅的多:

from multiprocessing import Pool

def power(x):
    for _ in range(4):
        x *= x
    return x

if __name__ == '__main__':
    res = []
    nums = [1, 2, 3]
    p = Pool(processes=3)
    for num in nums:
        temp = p.apply_async(power, args=(num,))
        res.append(temp.get())
    p.close()
    p.join()
    for result in res:
        print(result)

代码说明:

  • 首先,与使用 Process 的方法很大不同之处就是目标函数可以使用 return 返回结果,免去了要用进程间通信的办法才能得到结果的麻烦。
  • 异步使用进程池的基本步骤:一、实例化进程池,processes 是可选参数,默认使用本机 CPU 的所有核心,也可以指定一个数量,我的经验是尽量不要用尽所有资源,尤其当数据量较大时,容易造成计算机系统不稳定,毕竟系统本身也需要计算资源,我这里指定其为 3;二、与使用 Process 批量创建进程一样,异步使用进程池(apply\_async)也要使用循环,但获取结果的方式变得更为直接,可以直接 get 到。这一步相当于 使用 Process 时的 start;三、使用进程池要多一个步骤,就是要关闭进程池(close),一旦调用 close() 方法就无法再添加新的子进程,然后才可以 join 进程池。注意如果不先关闭进程池,则 join 会失效;四、join 进程池。

还有没有更好的方法呢? 当然有了!就是使用 map 方法,同样的功能示例如下:

from multiprocessing import Pool

def power(x):
    for _ in range(4):
        x *= x
    return x

if __name__ == '__main__':
    nums = [1, 2, 3]
    p = Pool(processes=3)
    res = p.map(power, nums)
    p.close()
    p.join()
    for result in res:
        print(result)

代码说明:

  • map 方法的两个参数分别是目标函数和一个可迭代对象,它会将可迭代对象里的值一次性分发给各个子进程中的目标函数,也就是实现与之前的代码一样的功能。但整体上要紧凑优雅许多,最后返回一个列表。
  • 这里还有一点建议,就是能用进程池的时候就尽量不要用 Process,因为进程池对于资源的利用更为合理,比如这里我创建了三个工作进程,但是实际上任务并不会固定在 CPU 的某三个核心,而其他核心闲置,出现一核有难八核围观的情况。相反,负载会分摊到所有核心上,只是 CPU 的利用率与你设定的相同。比如,一共有四颗核心,那么此时 CPU 利用率还是 75%

那么,假如目标函数有不止一个变量呢? 虽然使用 map 方法既高效又简洁,但是也有一个弊端,就是单纯使用 map 函数,目标函数只可以有一个变量,因为 map 方法的可迭代对象中的值对应的都是相同的变量。下面提供两种思路:

思路一:

from multiprocessing import Pool

def power(para):
    x, time = para[0], para[1]
    for _ in range(time):
        x *= x
    return x

if __name__ == '__main__':
    para = [(1,4), (2,4), (3,4)]
    p = Pool(processes=3)
    res = p.map(power, para)
    p.close()
    p.join()
    for result in res:
        print(result)

代码说明:

  • power 函数实际上需要两个参数:x 和 time,所以我们把这两个参数包装成 tuple 放在列表里,然后在 power 函数内部解开这个 tuple,这样就实现了向目标函数传递多个变量。

思路二:

from multiprocessing import Pool
from functools import partial

def power(x, time):
    for _ in range(time):
        x *= x
    return x

if __name__ == '__main__':
    nums = [1, 2, 3]
    p = Pool(processes=3)
    res = p.map(partial(power, time=4), nums)
    p.close()
    p.join()
    for result in res:
        print(result)

代码说明:

  • 我之前开发的项目最后就是使用了这个方案。这里用到了 functools 里的 partial 函数,也就是偏函数。这里我们先将 power 函数包装成偏函数,在这个过程中给他的变量 time 赋值为 4,然后再把列表 nums map 给进程池。

上述的两种思路各有所长,使用哪种方式也是视场合而定。

二、进程间通信

这里仅介绍使用 multiprocessing 中的 Queue 和 Pipe 进行进程间通信。(因为其他的方式我不大懂。。。。)

1. Queue

在一开始介绍 Process 类时就用到了 Queue 获取目标函数返回值,这其实就是一种进程间通信。Queue 是多进程安全的队列,在实例化 Queue 时接受一个参数 maxsize 来限制其中的元素个数。可以通过 put 方法把元素从队尾插入,通过 get 方法把元素从队首取出。这两个方法都有两个参数:blocked 和 timeout:

  • 当 Queue 已满且 blocked 为 True 时,如果 timeout 为正值,则会阻塞 timeout 所指定的时长,直到该 Queue 腾出剩余空间。如果超时,则会抛出 Exception: Queue.Full
  • 当 Queue 为空且 blocked 为 True 时,如果 timeout 为正值,则会等待 timeout 所指定的时长,直到有元素插入再被取走。如果超时,则会抛出Exception: Queue.Empty

下面的代码演示了使用 Queue 进行进程间通信:

from multiprocessing import Process, Queue

def put_it(x, q):
    for _ in range(4):
        x *= x
        q.put(x)

def get_it(q):
    for _ in range(4):
        value = q.get()
        q.put(value-value)

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=put_it, args=(2, q))
    p2 = Process(target=get_it, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    for _ in range(4):
        print(q.get())

这段代码打印出的结果就是四个 0

2. Pipe

Pipe 的作用就像它的名字一样,充当进程间数据的管道,在实例化 Pipe 时接受一个参数 duplex,来决定其是双向的还是单向的(默认为 True,即双向管道,但是实际上是由两个单向管道构成的二元组)。

下面的代码演示了使用 Pipe 进行进程间通信:

from multiprocessing import Process, Queue, Pipe

def put_1(p, q):
    x = 'hello'
    p[0].send(x)
    temp = 'put_1: ' + p[0].recv()
    q.put(temp)

def put_2(p, q):
    y = 'nice to meet you'
    p[1].send(y)
    temp = 'put_2: ' + p[1].recv()
    q.put(temp)
    
if __name__ == '__main__':
    p = Pipe()
    q = Queue()
    p1 = Process(target=put_1, args=(p, q))
    p2 = Process(target=put_2, args=(p, q))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    for _ in range(2):
        print(q.get())

这段代码打印出来为:
put\_1: nice to meet you
put\_2: hello

二、结尾

有一些情况还可以考虑使用其他方法进行加速,毕竟创建进程的开销是非常大的。比如:计算密集型的(memory bound)程序可以使用 Cython 获得加速;network bound 的程序可以使用多线程加速,这种情况下是没有 GIL 的,创建线程的开销要比进程小的多。在实际开发中更多时候,是在程序不同的位置分别采用合适的加速手段来实现性能的最优化。

标签: 进程, Python, put, Queue, res, Process

相关文章推荐

添加新评论,含*的栏目为必填