fork

通过os模块调用系统调用fork,由于windows没有fork,所以以下代码不能在windows下运行

import os

# 打印当前进程ID
print('Process (%s) start...' % os.getpid())

# fork出一个子进程
pid = os.fork()

# 子进程永远返回0
if pid == 0:
    # 打印当前进程ID或父进程ID
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    # 打印当前进程ID和子进程ID
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

multiprocessing 模块

跨平台兼容,对fork进行了封装,使用pickle兼容了windows

from multiprocessing import Process

'''

aaa
bbb
abc in child
ccc
ddd

'''


def test(data):
    print(data, "in child")


if __name__ == '__main__':
    print('aaa')

    # 创建子进程函数
    p = Process(target=test, args=("abc",))
    # 启动子进程
    p.start()

    print('bbb')

    # 等待子进程执行完再往下执行
    p.join()

    print('ccc')

    # 关闭子进程
    p.close()

    print('ddd')

进程池

from multiprocessing import Pool
import time

# 最后一个4会等待10秒后输出,因为进程等待了
# aaa
# bbb
# 0
# 1
# 2
# 3
# 4
# ccc

print('aaa')
def test(data):
    print(data)
    time.sleep(10)

p = Pool(4)

for i in range(5):
    p.apply_async(func=test,args=(i,))

print('bbb')

# 必须close后再join,close之后就不能继续添加新的Process了
p.close()
# 等待所有的子进程结束
p.join()

print('ccc')

子进程调用

很多时候,子进程并不是自身,而是一个外部进程。我们创建了子进程后,还需要控制子进程的输入和输出

import subprocess

s = subprocess.call(['nslookup', 'www.python.org'])
print(s)

进程间通信

multiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来实现进程间通信

from multiprocessing import Process, Queue

def pWrite(queue):
    for i in range(1, 4):
        queue.put("test" + str(i))

def pRead(queue):
    while (True):
        print(queue.get(True))

# 使用队列进行进程间通信
q = Queue()
pw = Process(target=pWrite,args=(q,))
pr = Process(target=pRead,args=(q,))

pw.start()
pr.start()

pw.join()
# 因为读进程中是一个死循环,所以需要使用 terminate 中断
pr.terminate()

分布式进程

先执行主进程(master.py)发布任务,并等待结果

再执行工作进程(worker1.py、worker2.py),处理任务并返回结果

master.py

from multiprocessing.managers import BaseManager
import queue

class Manager(BaseManager):
    pass

tq = queue.Queue()
rq = queue.Queue()

# 注册任务发布队列
Manager.register("taskQueue", callable=lambda: tq)
# 注册任务结果获取队列
Manager.register("resultQueue", callable=lambda: rq)

# 暴露9503端口给worker连接,授权key为 b'txf'
manager = Manager(address=('', 9503), authkey=b'txf')
manager.start()

# 获取可以通过网络访问的Queue对象
task = manager.taskQueue()
result = manager.resultQueue()

# 发布任务
for i in range(1, 10):
    task.put("task " + str(i))
    print("put task " + str(i))

# 接收任务执行结果,等待时间10秒
for i in range(1, 10):
    r = result.get(timeout=10)
    print('result' + str(r))

manager.shutdown()

worker1.py、worker2.py

from multiprocessing.managers import BaseManager
import time,os

class Manager(BaseManager):
    pass

# 注册队列任务,这里不需要定义,实际使用的是master上面的队列
Manager.register("taskQueue")
Manager.register("resultQueue")

# 配置master信息并连接master
manager = Manager(address=('127.0.0.1', 9503), authkey=b'txf')
manager.connect()

# 获取master上的队列
task = manager.taskQueue()
result = manager.resultQueue()

name=os.path.basename(__file__)

# 获取任务队列,处理后把结果扔到结果队列
for i in range(5):
    try:
        r = task.get(timeout=1)
        r = r + 'finish in ' + name
        time.sleep(1)
        result.put(r)
    except Exception as e:
        print(e)

results matching ""

    No results matching ""