Zodiac Wang
  • Home
  • Categories
  • Tags
  • Archives

Python多进程

本文为使用Python多进程时临时抱佛脚的内容

Table of Contents

  • 1  多进程multiprocessing
  • 2  子进程subprocess
  • 3  进程间通信

本笔记需要在 Linux 环境下运行

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

In [1]:
from boxx import what
In [2]:
import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
Process (53256) start...
I (53256) just created a child process (53287).
I am child process (53287) and my parent is 53256.

Windows 没有 fork 调用,上面的代码在 Windows 上无法运行。在 Linux 下运行是没有问题的。

有了 fork 调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的 Apache 服务器就是由父进程监听端口,每当有新的 http 请求时,就 fork 出子进程来处理新的 http 请求。

多进程multiprocessing¶

编写多进程的服务程序,Unix/Linux 无疑是正确的选择

不过由于 Python 是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing 模块就是跨平台版本的多进程模块。

multiprocessing 模块提供了一个 Process 类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

In [3]:
from multiprocessing import Process
import os

# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
Parent process 53256.
Child process will start.
Run child process test (53290)...
Child process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个 Process 实例,用 start() 方法启动,这样创建进程比 fork() 还要简单。

join() 方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

In [4]:
from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))
    return name*2

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool()
    res = [0]*13
    for i in range(13):
        res[i] = p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
Parent process 53256.
Run task 1 (53294)...
Run task 5 (53298)...
Run task 2 (53295)...
Run task 4 (53297)...
Run task 0 (53293)...
Run task 6 (53299)...
Run task 8 (53301)...
Run task 3 (53296)...
Run task 7 (53300)...
Run task 10 (53303)...
Run task 9 (53302)...
Run task 11 (53304)...
Waiting for all subprocesses done...
Task 1 runs 0.29 seconds.
Run task 12 (53294)...
Task 7 runs 0.31 seconds.
Task 5 runs 0.83 seconds.
Task 2 runs 1.02 seconds.
Task 9 runs 1.23 seconds.
Task 3 runs 1.43 seconds.
Task 10 runs 1.70 seconds.
Task 6 runs 1.78 seconds.
Task 12 runs 1.75 seconds.
Task 0 runs 2.12 seconds.
Task 11 runs 2.31 seconds.
Task 8 runs 2.33 seconds.
Task 4 runs 2.65 seconds.
All subprocesses done.

看一下res

In [5]:
res
Out[5]:
[<multiprocessing.pool.ApplyResult at 0x7efd36094048>,
 <multiprocessing.pool.ApplyResult at 0x7efd716db710>,
 <multiprocessing.pool.ApplyResult at 0x7efd716db7b8>,
 <multiprocessing.pool.ApplyResult at 0x7efd716db860>,
 <multiprocessing.pool.ApplyResult at 0x7efd716db908>,
 <multiprocessing.pool.ApplyResult at 0x7efd716db9b0>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dba58>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbb38>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbc18>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbcf8>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbdd8>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbeb8>,
 <multiprocessing.pool.ApplyResult at 0x7efd716dbf98>]
In [6]:
what(res)
----------end of what("[<multiprocessing.pool.Appl...")----------
Attrs: 
└── list: 46 attrs, Built-in mutable sequence.↳↳If no argum...
    ├── __add__: method-wrapper : Return self+value.
    ├── __class__: <class 'list'>
    ├── __contains__: method-wrapper : Return key in self.
    ├── __delattr__: method-wrapper : Implement delattr(self, n...
    ├── __delitem__: method-wrapper : Delete self[key].
    ├── __dir__: builtin-method : Default dir() implementat...
    ├── __doc__: Built-in mutable sequence.↳↳If no argument is g...
    ├── __eq__: method-wrapper : Return self==value.
    ├── __format__: builtin-method : Default object formatter....
    ├── __ge__: method-wrapper : Return self>=value.
    ├── __getattribute__: method-wrapper : Return getattr(self, name...
    ├── __getitem__: builtin-method : x.__getitem__(y) <==> x[y...
    ├── __gt__: method-wrapper : Return self>value.
    ├── __hash__: None
    ├── __iadd__: method-wrapper : Implement self+=value.
    ├── __imul__: method-wrapper : Implement self*=value.
    ├── __init__: method-wrapper : Initialize self.  See hel...
    ├── __init_subclass__: builtin-method : This method is called whe...
    ├── __iter__: method-wrapper : Implement iter(self).
    ├── __le__: method-wrapper : Return self<=value.
    ├── __len__: method-wrapper : Return len(self).
    ├── __lt__: method-wrapper : Return self<value.
    ├── __mul__: method-wrapper : Return self*value.
    ├── __ne__: method-wrapper : Return self!=value.
    ├── __new__: builtin-method : Create and return a new o...
    ├── __reduce__: builtin-method : Helper for pickle.
    ├── __reduce_ex__: builtin-method : Helper for pickle.
    ├── __repr__: method-wrapper : Return repr(self).
    ├── __reversed__: builtin-method : Return a reverse iterator...
    ├── __rmul__: method-wrapper : Return value*self.
    ├── __setattr__: method-wrapper : Implement setattr(self, n...
    ├── __setitem__: method-wrapper : Set self[key] to value.
    ├── __sizeof__: builtin-method : Return the size of the li...
    ├── __str__: method-wrapper : Return str(self).
    ├── __subclasshook__: builtin-method : Abstract classes can over...
    ├── append: builtin-method : Append object to the end ...
    ├── clear: builtin-method : Remove all items from lis...
    ├── copy: builtin-method : Return a shallow copy of ...
    ├── count: builtin-method : Return number of occurren...
    ├── extend: builtin-method : Extend list by appending ...
    ├── index: builtin-method : Return first index of val...
    ├── insert: builtin-method : Insert object before inde...
    ├── pop: builtin-method : Remove and return item at...
    ├── remove: builtin-method : Remove first occurrence o...
    ├── reverse: builtin-method : Reverse *IN PLACE*.
    └── sort: builtin-method : Stable sort *IN PLACE*.


Document: 
└── Built-in mutable sequence.
     
     If no argument is given, the constructor creates a new empty list.
     The argument must be an iterable if specified.

Inner Struct:
└── /: list  13
    ├── 0: <multiprocessing.pool.ApplyResult object a...
    ├── 1: <multiprocessing.pool.ApplyResult object a...
    ├── 2: <multiprocessing.pool.ApplyResult object a...
    ├── 3: <multiprocessing.pool.ApplyResult object a...
    ├── 4: <multiprocessing.pool.ApplyResult object a...
    ├── 5: <multiprocessing.pool.ApplyResult object a...
    ├── ···
    ├── ···
    ├── ···
    ├── ···
    ├── Hidden 1 of all 13
    ├── ···
    ├── ···
    ├── ···
    ├── ···

    ├── 7: <multiprocessing.pool.ApplyResult object a...
    ├── 8: <multiprocessing.pool.ApplyResult object a...
    ├── 9: <multiprocessing.pool.ApplyResult object a...
    ├── 10: <multiprocessing.pool.ApplyResult object a...
    ├── 11: <multiprocessing.pool.ApplyResult object a...
    └── 12: <multiprocessing.pool.ApplyResult object a...


Classes: 
└── Instance of list <-object

To Str: 
└── "[<multiprocessing.pool.ApplyResult object at 0x7efd36094048>, <multiprocessing.pool.ApplyResult object at 0x7efd716db710>, <multiprocessing.pool.ApplyResult object at 0x7efd716db7b8>, <multiprocessing.pool.ApplyResult object at 0x7efd716db860>, <multiprocessing.pool.ApplyResult object at 0x7efd716db908>, <multiprocessing.pool.ApplyResult object at 0x7efd716db9b0>, <multiprocessing.pool.ApplyResult object at 0x7efd716dba58>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbb38>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbc18>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbcf8>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbdd8>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbeb8>, <multiprocessing.pool.ApplyResult object at 0x7efd716dbf98>]"

In [7]:
realres = list(map(lambda x:x.get(), res))
realres
Out[7]:
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]

get 函数用于读取结果

对 Pool 对象调用 join() 方法会等待所有子进程执行完毕,调用 join() 之前必须先调用 close(),调用 close() 之后就不能继续添加新的 Process了。

由于Pool的默认大小是CPU的核数,所以会立即执行 CPU 核心数个 task

p = Pool(5) 就可以同时跑5个进程。

子进程subprocess¶

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出。

subprocess模块可以让我们非常方便地启动一个子进程,然后控制其输入和输出。

下面的例子演示了如何在Python代码中运行命令nslookup www.python.org

这和命令行直接运行的效果是一样的:

In [8]:
import subprocess

print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
$ nslookup www.python.org
Exit code: 0

如果子进程还需要输入,则可以通过communicate()方法输入:

In [9]:
import subprocess

print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
$ nslookup
Server:		127.0.0.53
Address:	127.0.0.53#53

Non-authoritative answer:
python.org	mail exchanger = 50 mail.python.org.

Authoritative answers can be found from:


Exit code: 0

进程间通信¶

Process 之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Python 的multiprocessing 模块包装了底层的机制,提供了 Queue、Pipes 等多种方式来交换数据。

以 Queue 为例,在父进程中创建两个子进程,一个往 Queue 里写数据,一个从 Queue 里读数据:

In [10]:
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()
Process to write: 53368
Put A to queue...
Process to read: 53371
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

对于 异步进程 如 apply_async 或 map,通讯则有些不同,函数的传入参数是副本,即子进程对参数的改变不会影响父进程中的变量,这和Python中普遍的按引用传递参数不同。

In [11]:
def func(ls,i):
    print('before', ls)
    ls[i] = i
    print('after', ls)
    return ls

ls = [0]*5
p = Pool()
res = [0]*5
for i in range(5):
    res[i] = p.apply_async(func, args=(ls, i)) 
p.close()
p.join()
            
print('\nAfter all', ls)
before [0, 0, 0, 0, 0]
before [0, 0, 0, 0, 0]
before [0, 0, 0, 0, 0]
before [0, 0, 0, 0, 0]
before [0, 0, 0, 0, 0]
after [0, 0, 0, 0, 4]
after [0, 0, 0, 0, 0]
after [0, 1, 0, 0, 0]
after [0, 0, 0, 3, 0]
after [0, 0, 2, 0, 0]

After all [0, 0, 0, 0, 0]
In [12]:
res
Out[12]:
[<multiprocessing.pool.ApplyResult at 0x7efd360949b0>,
 <multiprocessing.pool.ApplyResult at 0x7efd716e0518>,
 <multiprocessing.pool.ApplyResult at 0x7efd716e05c0>,
 <multiprocessing.pool.ApplyResult at 0x7efd716e0668>,
 <multiprocessing.pool.ApplyResult at 0x7efd716e0710>]
In [13]:
realres = list(map(lambda x:x.get(), res))
realres
Out[13]:
[[0, 0, 0, 0, 0],
 [0, 1, 0, 0, 0],
 [0, 0, 2, 0, 0],
 [0, 0, 0, 3, 0],
 [0, 0, 0, 0, 4]]

在 Unix/Linux 下,multiprocessing 模块封装了 fork() 调用,使我们不需要关注 fork() 的细节。

由于 Windows 没有fork调用,因此,multiprocessing 需要“模拟”出 fork 的效果,父进程所有 Python 对象都必须通过 pickle 序列化再传到子进程去,所有,如果 multiprocessing 在 Windows 下调用失败了,首先考虑是不是 pickle失败了

Examples from multiprocessing

In [14]:
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == "__main__":
    p = Pool(5)
    print(list(p.map(f, [1,2,3])))
[1, 4, 9]
In [15]:
from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print('PID', res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (5,))
        try:
            print(res.get(timeout=4))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
0
1
4
16
9
25
36
64
49
81
400
PID 53433
[53430, 53432, 53430, 53431]
We lacked patience and got a multiprocessing.TimeoutError
For the moment, the pool remains available for more work
Now the pool is closed and no longer available
  • 在Unix/Linux下,可以使用fork()调用实现多进程。

  • 要实现跨平台的多进程,可以使用multiprocessing模块。

  • 进程间通信是通过Queue、Pipes等实现的

References:

  • 廖雪峰 Python3 多进程
  • multiprocessing

  • « Pelican主题Elegant细节设置
  • 让Matplotlib正确显示中文 »

Published

10 23, 2018

Category

posts

Tags

  • 多进程 1
  • Python 16

Contact

  • Zodiac Wang - A Fantastic Learner
  • Powered by Pelican. Theme: Elegant