fork
通过os模块调用系统调用fork,由于windows没有fork,所以以下代码不能在windows下运行
import os
# 打印当前进程ID
print('Process (%s) start...' % os.getpid())
# fork出一个子进程
pid = os.fork()
# 子进程永远返回0
if pid == 0:
# 打印当前进程ID或父进程ID
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
# 打印当前进程ID和子进程ID
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
multiprocessing 模块
跨平台兼容,对fork进行了封装,使用pickle兼容了windows
from multiprocessing import Process
'''
aaa
bbb
abc in child
ccc
ddd
'''
def test(data):
print(data, "in child")
if __name__ == '__main__':
print('aaa')
# 创建子进程函数
p = Process(target=test, args=("abc",))
# 启动子进程
p.start()
print('bbb')
# 等待子进程执行完再往下执行
p.join()
print('ccc')
# 关闭子进程
p.close()
print('ddd')
进程池
from multiprocessing import Pool
import time
# 最后一个4会等待10秒后输出,因为进程等待了
# aaa
# bbb
# 0
# 1
# 2
# 3
# 4
# ccc
print('aaa')
def test(data):
print(data)
time.sleep(10)
p = Pool(4)
for i in range(5):
p.apply_async(func=test,args=(i,))
print('bbb')
# 必须close后再join,close之后就不能继续添加新的Process了
p.close()
# 等待所有的子进程结束
p.join()
print('ccc')
子进程调用
很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出
import subprocess
s = subprocess.call(['nslookup', 'www.python.org'])
print(s)
进程间通信
multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来实现进程间通信
from multiprocessing import Process, Queue
def pWrite(queue):
for i in range(1, 4):
queue.put("test" + str(i))
def pRead(queue):
while (True):
print(queue.get(True))
# 使用队列进行进程间通信
q = Queue()
pw = Process(target=pWrite,args=(q,))
pr = Process(target=pRead,args=(q,))
pw.start()
pr.start()
pw.join()
# 因为读进程中是一个死循环,所以需要使用 terminate 中断
pr.terminate()
分布式进程
先执行主进程(master.py)发布任务,并等待结果
再执行工作进程(worker1.py、worker2.py),处理任务并返回结果
master.py
from multiprocessing.managers import BaseManager
import queue
class Manager(BaseManager):
pass
tq = queue.Queue()
rq = queue.Queue()
# 注册任务发布队列
Manager.register("taskQueue", callable=lambda: tq)
# 注册任务结果获取队列
Manager.register("resultQueue", callable=lambda: rq)
# 暴露9503端口给worker连接,授权key为 b'txf'
manager = Manager(address=('', 9503), authkey=b'txf')
manager.start()
# 获取可以通过网络访问的Queue对象
task = manager.taskQueue()
result = manager.resultQueue()
# 发布任务
for i in range(1, 10):
task.put("task " + str(i))
print("put task " + str(i))
# 接收任务执行结果,等待时间10秒
for i in range(1, 10):
r = result.get(timeout=10)
print('result' + str(r))
manager.shutdown()
worker1.py、worker2.py
from multiprocessing.managers import BaseManager
import time,os
class Manager(BaseManager):
pass
# 注册队列任务,这里不需要定义,实际使用的是master上面的队列
Manager.register("taskQueue")
Manager.register("resultQueue")
# 配置master信息并连接master
manager = Manager(address=('127.0.0.1', 9503), authkey=b'txf')
manager.connect()
# 获取master上的队列
task = manager.taskQueue()
result = manager.resultQueue()
name=os.path.basename(__file__)
# 获取任务队列,处理后把结果扔到结果队列
for i in range(5):
try:
r = task.get(timeout=1)
r = r + 'finish in ' + name
time.sleep(1)
result.put(r)
except Exception as e:
print(e)