Skip to content

Python 多进程编程指南

1. 概览

1.1 为什么选择多进程

  1. 充分利用多核处理器:多进程可以同时利用多个 CPU 核心,实现并行处理,加快任务执行速度。
  2. 避免 GIL 的影响:Python 的全局解释器锁(GIL)限制了多线程并发执行时的效率,而多进程可以绕过这一限制。
  3. 提高程序稳定性:由于多进程拥有独立的内存空间,进程之间互不影响,因此在需要隔离环境的任务中更加稳定可靠。
  4. 适用于 CPU 密集型任务:对于需要大量计算的任务,多进程能够更好地利用计算资源,提高程序执行效率。

1.2 操作系统基础

Unix/Linux 提供了 fork() 系统调用。与普通函数“调用一次、返回一次”不同,fork() 调用一次会返回两次:在父进程中返回子进程的 PID,在子进程中返回 0。子进程可以通过 getppid() 获取父进程 ID。

Python 的 os 模块封装了该系统调用,允许在程序中直接创建子进程:

python
import os

print(f'Process ({os.getpid()}) start...')

# 仅在 Unix/Linux/macOS 上有效
pid = os.fork()
if pid == 0:
    print(f'I am child process ({os.getpid()}), parent is {os.getppid()}.')
else:
    print(f'I ({os.getpid()}) just created a child process ({pid}).')

运行结果:

text
Process (876) start...
I (876) just created a child process (877).
I am child process (877), parent is 876.

由于 Windows 没有 fork() 调用,上述代码无法在 Windows 上运行。为了使 Python 具备跨平台的多进程能力,标准库提供了 multiprocessing 模块。

2. Multiprocessing 模块

multiprocessing 是 Python 内置的多进程编程模块。通过 Process 类可以创建新进程,通过 Pool 类可以创建进程池实现并行处理,通过 QueuePipe 等机制可以实现进程间通信。

2.1 子进程启动方式

Python 3 支持三种启动子进程的方式:spawnforkforkserver

  1. spawn:启动一个全新的 Python 解释器进程,不继承父进程不必要的文件描述符与资源。跨平台兼容性最好,但启动较慢。
  2. fork:使用 os.fork() 创建子进程,继承父进程资源。在 Linux 下通过写时复制(Copy-on-Write)实现,创建成本低。但如果父进程是多线程程序,使用 fork 可能导致死锁,详见 Python Multiprocessing
  3. forkserver:启动一个独立的 Fork Server 单线程进程,由它负责调用 os.fork() 产生子进程,从而避免多线程父进程 fork 导致的问题。

不同操作系统默认启动方式不同。Unix/Linux 通常默认使用 fork,Windows 与 macOS 默认使用 spawn

可以通过以下两种方式显式指定启动方式:

python
# 方式一:全局设置
import multiprocessing as mp

if __name__ == '__main__':
    mp.set_start_method('spawn')
python
# 方式二:使用上下文对象
import multiprocessing as mp


def foo(queue):
    queue.put('hello')


if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    queue = ctx.Queue()
    process = ctx.Process(target=foo, args=(queue,))
    process.start()

2.2 创建进程

multiprocessing.Process 类用于创建新进程。每个 Process 实例拥有独立的内存空间。调用 start() 启动进程,调用 join() 等待进程结束。

2.2.1 方式一:传入目标函数

python
from multiprocessing import Process
import os


def run_proc(name):
    print(f'Run child process: {name} ({os.getpid()})...')


if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')

运行结果:

text
Parent process 928.
Child process will start.
Run child process: test (929)...
Child process end.

2.2.2 方式二:继承 Process 类

python
from multiprocessing import Process


class MyProcess(Process):
    def __init__(self, name):
        super().__init__()
        self.name = name

    def run(self):
        print(f'Test Python Process: {self.name}')


if __name__ == '__main__':
    process_list = []
    for i in range(5):
        p = MyProcess(f'P_{i}')
        p.start()
        process_list.append(p)

    for p in process_list:
        p.join()

    print('Finished')

运行结果:

text
Test Python Process: P_0
Test Python Process: P_1
Test Python Process: P_2
Test Python Process: P_3
Test Python Process: P_4
Finished

2.3 创建进程池

multiprocessing.Pool 用于创建进程池,可方便地管理多个进程。通过 map()apply()apply_async() 等方法将任务分配给进程池并行执行。

Pool 默认大小为 CPU 核心数,也可通过 processes 参数自定义。与 Process 不同,提交到 Pool 的任务可以有返回值。

常用方法说明:

  • map(func, iterable):将可迭代对象中的每个元素传给 func,按输入顺序返回结果。
  • apply_async(func, args):异步提交单个任务,返回 AsyncResult 对象,通过 get() 获取结果。
  • close():关闭进程池,不再接受新任务。
  • join():等待所有子进程执行完毕,调用前必须先调用 close()

创建 Pool 时还可以使用以下参数:

  • initializer / initargs:对每个工作进程执行初始化函数。
  • maxtasksperchild:每个工作进程最多处理的任务数,超过后会被新进程替换,适用于存在内存泄漏风险的场景。

2.3.1 apply_async 示例

python
from multiprocessing import Pool
import os
import time
import random


def worker(name):
    print(f'Run task {name} ({os.getpid()})...')
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print(f'Task {name} runs {end - start:.2f} seconds.')


if __name__ == '__main__':
    print(f'Parent process {os.getpid()}.')
    p = Pool(processes=4)
    for i in range(5):
        p.apply_async(worker, args=(i,))

    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')

运行结果:

text
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

由于 Pool 大小为 4,最多同时运行 4 个进程;task 4 需要等待某个任务完成后才能开始执行。

2.3.2 map 示例

python
import multiprocessing as mp


def square(x):
    return x ** 2


if __name__ == '__main__':
    with mp.Pool(5) as pool:
        results = pool.map(square, range(1, 11))
    print(results)

运行结果:

text
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]

下面是一个更实用的例子:批量生成图片缩略图。

python
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from PIL import Image


def get_image_paths(folder):
    return [
        os.path.join(folder, filename)
        for filename in os.listdir(folder)
        if filename.lower().endswith(('.jpeg', '.jpg', '.png'))
    ]


def create_thumbnail(image_path, save_dir, size=(75, 75)):
    with Image.open(image_path) as im:
        im.thumbnail(size, Image.LANCZOS)
        base, filename = os.path.split(image_path)
        save_path = os.path.join(save_dir, filename)
        os.makedirs(save_dir, exist_ok=True)
        im.save(save_path)
    return save_path


def process_images(image_dir, save_dir, size=(75, 75), max_workers=None):
    image_paths = get_image_paths(image_dir)

    with ProcessPoolExecutor(max_workers=max_workers) as executor:
        futures = [
            executor.submit(create_thumbnail, path, save_dir, size)
            for path in image_paths
        ]
        for future in as_completed(futures):
            try:
                result = future.result()
                print(f'Thumbnail created: {result}')
            except Exception as e:
                print(f'Error processing image: {e}')


if __name__ == '__main__':
    root_dir = 'work_dir'
    image_dir = os.path.join(root_dir, 'images')
    save_dir = os.path.join(root_dir, 'thumb')
    image_size = (75, 75)
    process_images(image_dir, save_dir, image_size)

2.4 进程间通信

multiprocessing 提供了 QueuePipe 等方式实现进程间数据交换。

  • Queue:线程/进程安全的队列,可在多个进程之间安全地传递数据。
  • Pipe:提供一对连接对象,实现两个进程之间的双向通信。
  • pickle:用于序列化 Python 对象,在进程间传输复杂数据结构。

2.4.1 队列(Queue)

Queue 将每个进程的运算结果暂存,待进程结束后再统一取出。由于多进程调用的函数无法直接 return,通常使用 Queue 收集结果。

常用方法:

  • put(item, block=True, timeout=None):向队列中放入数据。
  • get(block=True, timeout=None):从队列中读取并移除一个元素。
python
from multiprocessing import Process, Queue
import time
import random


def producer(queue):
    print('生产者进程开始')
    for i in range(5):
        item = random.randint(1, 100)
        queue.put(item)
        print(f'生产者放入: {item}')
        time.sleep(random.random())
    queue.put(None)  # 发送结束信号
    print('生产者进程结束')


def consumer(queue):
    print('消费者进程开始')
    while True:
        item = queue.get()
        if item is None:
            break
        print(f'消费者取出: {item}')
        time.sleep(random.random())
    print('消费者进程结束')


if __name__ == '__main__':
    q = Queue()

    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('所有进程结束')

运行结果:

text
生产者进程开始
生产者放入: 72
消费者进程开始
消费者取出: 72
生产者放入: 77
消费者取出: 77
生产者放入: 20
消费者取出: 20
生产者放入: 94
生产者放入: 8
生产者进程结束
消费者取出: 94
消费者取出: 8
消费者进程结束
所有进程结束

2.4.2 管道(Pipe)

Pipe() 返回一对连接对象,默认全双工模式,两端均可收发。

python
from multiprocessing import Process, Pipe


def worker(conn):
    print('子进程开始工作')
    msg = conn.recv()
    print(f'子进程收到消息: {msg}')

    result = f'处理结果: {msg.upper()}'
    conn.send(result)
    print('子进程完成工作')
    conn.close()


if __name__ == '__main__':
    parent_conn, child_conn = Pipe()

    p = Process(target=worker, args=(child_conn,))
    p.start()

    print('主进程发送消息: hello world')
    parent_conn.send('hello world')

    result = parent_conn.recv()
    print(f'主进程收到结果: {result}')

    p.join()
    print('所有进程结束')

运行结果:

text
子进程开始工作
主进程发送消息: hello world
子进程收到消息: hello world
子进程完成工作
主进程收到结果: 处理结果: HELLO WORLD
所有进程结束

2.4.3 序列化(pickle)

复杂对象可以通过 pickle 序列化后在进程间传递。注意:pickle 不适合处理不可信来源的数据。

python
import multiprocessing
import pickle
import random
import time


def producer(queue):
    print('生产者进程开始')
    for i in range(5):
        item = {'id': i, 'value': random.randint(1, 100)}
        queue.put(pickle.dumps(item))
        print(f'生产者放入: {item}')
        time.sleep(random.random())
    queue.put(pickle.dumps(None))
    print('生产者进程结束')


def consumer(queue):
    print('消费者进程开始')
    while True:
        serialized_item = queue.get()
        item = pickle.loads(serialized_item)
        if item is None:
            break
        print(f'消费者取出: {item}')
        time.sleep(random.random())
    print('消费者进程结束')


if __name__ == '__main__':
    ctx = multiprocessing.get_context('spawn')
    q = ctx.Queue()

    p1 = ctx.Process(target=producer, args=(q,))
    p2 = ctx.Process(target=consumer, args=(q,))

    p1.start()
    p2.start()
    p1.join()
    p2.join()

    print('所有进程结束')

3. 多进程与多线程

3.1 进程与线程概念

  • 进程:程序的一次执行过程,是系统资源分配的基本单位。每个进程拥有独立的内存空间,包括代码段、数据段、堆栈等。进程之间相互独立,通信需要特殊手段。
  • 线程:进程中的一个执行流,是 CPU 调度的基本单位。同一进程内的线程共享内存空间,通信更方便,但也更容易相互影响。

Python 的标准库提供了 _thread(低级)与 threading(高级)两个模块,绝大多数情况下使用 threading 即可。

启动一个线程只需将函数传入 Thread 实例并调用 start()

python
import time
import threading


def loop():
    print(f'Thread {threading.current_thread().name} is running...')
    n = 0
    while n < 5:
        n = n + 1
        print(f'Thread {threading.current_thread().name} >>> {n}')
        time.sleep(1)
    print(f'Thread {threading.current_thread().name} ended.')


if __name__ == '__main__':
    print(f'Thread {threading.current_thread().name} is running...')
    thread = threading.Thread(target=loop, name='LoopThread')
    thread.start()
    thread.join()
    print(f'Thread {threading.current_thread().name} ended.')

运行结果:

text
Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.

3.2 多进程 vs. 多线程性能对比

python
import multiprocessing as mp
import threading
import time

MAX = 10_000_000


def job(queue):
    res = 0
    for i in range(MAX):
        res += i + i ** 2 + i ** 3
    queue.put(res)


def multiprocess():
    queue = mp.Queue()
    p1 = mp.Process(target=job, args=(queue,))
    p2 = mp.Process(target=job, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    res1 = queue.get()
    res2 = queue.get()
    print('multiprocess:', res1 + res2)


def normal():
    res = 0
    for _ in range(2):
        for i in range(MAX):
            res += i + i ** 2 + i ** 3
    print('normal:', res)


def multithread():
    queue = mp.Queue()
    t1 = threading.Thread(target=job, args=(queue,))
    t2 = threading.Thread(target=job, args=(queue,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    res1 = queue.get()
    res2 = queue.get()
    print('multithreading:', res1 + res2)


if __name__ == '__main__':
    st = time.time()
    normal()
    st1 = time.time()
    print(f'normal time: {st1 - st:.2f}')

    multithread()
    st2 = time.time()
    print(f'multithreading time: {st2 - st1:.2f}')

    multiprocess()
    print(f'multiprocess time: {time.time() - st2:.2f}')

从结果可以看出,多进程执行时间通常短于多线程与串行执行,而多线程与串行执行时间相近。原因是 CPython 的 GIL 限制了同一时刻只有一个线程执行 Python 字节码,因此 Python 多线程无法发挥多核优势,而多进程可以绕过这一限制。

3.3 进程与线程的区别

维度进程线程
资源占用独立内存空间,资源消耗大共享进程内存,轻量
通信方式需要队列、管道等特殊机制可直接访问共享数据
并发性可利用多核,实现真正并行受 GIL 限制,无法利用多核
稳定性一个进程崩溃不影响其他进程一个线程错误可能导致整个进程崩溃
适用场景CPU 密集型任务I/O 密集型任务

3.4 选择多进程还是多线程

多任务场景下通常采用 Master-Worker 模式:Master 负责分配任务,Worker 负责执行任务。

  • 多进程:稳定性高,一个子进程崩溃不会影响主进程与其他子进程;但创建与切换开销较大。适合 CPU 密集型任务。
  • 多线程:创建开销小,但受 GIL 限制,且一个线程崩溃可能导致整个进程崩溃。适合 I/O 密集型任务。

现代 Web 服务器常采用多进程 + 多线程的混合模式,以兼顾稳定性与并发效率。

3.4.1 计算密集型 vs. I/O 密集型

  • 计算密集型任务:消耗大量 CPU 资源,如视频编解码、科学计算等。Python 作为解释型语言运行效率有限,可结合 C/C++ 扩展或 NumPy 等库优化。
  • I/O 密集型任务:大部分时间等待网络、磁盘等 I/O 操作,如 Web 应用、文件下载等。Python 配合异步 I/O 或线程池通常足够高效。

3.4.2 异步 I/O

现代操作系统支持异步 I/O,允许在单进程单线程模型下高效处理多任务,这种模型称为事件驱动模型。Nginx 就是典型的异步 I/O Web 服务器。在多核 CPU 上,可以运行与核心数相等的进程,每个进程内部采用异步 I/O,从而兼顾并发与多核利用。

3.5 分布式进程

相比线程,进程更稳定,且可以分布到多台机器上。multiprocessing.managers 子模块支持将队列等对象暴露到网络上,让其他机器的进程访问,从而方便地构建分布式多进程程序。

3.5.1 服务进程(task_master.py)

python
# task_master.py
import random
import queue
import time
from multiprocessing.managers import BaseManager

# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()


class QueueManager(BaseManager):
    pass


QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)

manager = QueueManager(address=('', 5000), authkey=b'abc')
manager.start()

task = manager.get_task_queue()
result = manager.get_result_queue()

for i in range(10):
    n = random.randint(0, 10000)
    print(f'Put task {n}...')
    task.put(n)

print('Try get results...')
for i in range(10):
    r = result.get(timeout=10)
    print(f'Result: {r}')

manager.shutdown()
print('master exit.')

注意:在分布式多进程环境下,必须通过 manager.get_task_queue() 获得的接口添加任务,不能直接操作原始的 task_queue,否则会绕过 QueueManager 的封装。

3.5.2 工作进程(task_worker.py)

python
# task_worker.py
import queue
import time
from multiprocessing.managers import BaseManager


class QueueManager(BaseManager):
    pass


QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

server_addr = '127.0.0.1'
print(f'Connect to server {server_addr}...')

m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()

task = m.get_task_queue()
result = m.get_result_queue()

for i in range(10):
    try:
        n = task.get(timeout=1)
        print(f'Run task {n} * {n}...')
        r = f'{n} * {n} = {n * n}'
        time.sleep(1)
        result.put(r)
    except queue.Empty:
        print('Task queue is empty.')

print('worker exit.')

4. 进程池与异步编程

4.1 Pool 的使用与优化

  • 使用:通过 apply()map()starmap() 等方法提交任务,配合 close()join() 等待完成。
  • 优化建议
    • 根据 CPU 核心数与任务特性设置合适的进程数,避免上下文切换开销。
    • 尽量减少进程间通信;若任务可并行处理,尽量一次性提交大量任务。

下面是一个优化的 multiprocessing.Pool 使用示例:

python
import multiprocessing
import os
import time


def cpu_bound_task(n):
    """模拟一个 CPU 密集型任务。"""
    count = 0
    for i in range(n):
        count += i * i
    return count


def process_chunk(chunk):
    """处理一个数据块。"""
    return [cpu_bound_task(item) for item in chunk]


def main():
    num_cores = os.cpu_count()
    print(f'本机有 {num_cores} 个 CPU 核心')

    num_tasks = 10000
    tasks = list(range(num_tasks))

    chunk_size = len(tasks) // num_cores
    chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)]

    start_time = time.time()

    with multiprocessing.Pool(processes=num_cores) as pool:
        results = pool.map(process_chunk, chunks)

    final_results = [item for sublist in results for item in sublist]

    end_time = time.time()
    print(f'处理 {num_tasks} 个任务耗时: {end_time - start_time:.2f} 秒')
    print(f'结果数量: {len(final_results)}')


if __name__ == '__main__':
    main()

运行结果示例:

text
本机有 8 个 CPU 核心
处理 10000 个任务耗时: 1.82 秒
结果数量: 10000

该示例展示了以下优化点:

  1. 根据 CPU 核心数设置进程池大小,充分利用多核优势。
  2. 将任务分块处理,减少进程间通信开销。
  3. 使用 map() 自动分配任务并收集结果。
  4. 使用 with 语句确保进程池正确关闭,避免资源泄漏。

4.2 concurrent.futures 简介

concurrent.futures 提供了更现代的异步执行接口,主要包含:

  • ThreadPoolExecutor:使用线程池执行异步调用,适合 I/O 密集型任务。
  • ProcessPoolExecutor:使用进程池执行异步调用,适合 CPU 密集型任务。
  • Future:表示异步计算的结果。

常用方法:

  • submit(fn, *args, **kwargs):提交单个任务。
  • map(func, *iterables):并行执行映射。
  • shutdown(wait=True):关闭执行器。

Future 方法:

  • result(timeout=None):获取结果。
  • exception(timeout=None):获取异常。
  • done() / cancelled() / running():查询任务状态。
  • add_done_callback(fn):添加完成回调。

4.2.1 示例一:分别处理 I/O 与 CPU 密集型任务

python
import concurrent.futures
import time

import requests


def download_image(url):
    response = requests.get(url, timeout=10)
    print(f'下载完成: {url}')
    return len(response.content)


def cpu_bound_task(n):
    count = 0
    for i in range(n):
        count += i * i
    return count


if __name__ == '__main__':
    image_urls = [
        'https://example.com/image1.jpg',
        'https://example.com/image2.jpg',
        'https://example.com/image3.jpg',
    ]
    numbers = [1000, 2000, 3000]

    start_time = time.time()

    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        image_sizes = list(executor.map(download_image, image_urls))
    print(f'图片大小: {image_sizes}')

    with concurrent.futures.ProcessPoolExecutor() as executor:
        results = list(executor.map(cpu_bound_task, numbers))
    print(f'计算结果: {results}')

    end_time = time.time()
    print(f'总耗时: {end_time - start_time:.2f} 秒')

4.2.2 示例二:使用 submitas_completed

python
import concurrent.futures
import time


def task(n):
    print(f'开始执行任务 {n}')
    time.sleep(n)
    return f'任务 {n} 完成'


if __name__ == '__main__':
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        futures = [executor.submit(task, i) for i in range(5)]
        for future in concurrent.futures.as_completed(futures):
            print(future.result())

submit()map() 更灵活,可以提交不同函数或参数;as_completed() 按任务完成顺序返回结果,适合处理耗时不同的任务。

4.2.3 示例三:结合 asyncioProcessPoolExecutor

python
import asyncio
import concurrent.futures
import time


def io_bound_task(n):
    # 模拟耗时任务
    time.sleep(1)
    return f'Task {n} completed'


async def main():
    loop = asyncio.get_event_loop()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        tasks = [
            loop.run_in_executor(executor, io_bound_task, i)
            for i in range(10)
        ]
        completed, _ = await asyncio.wait(tasks)
        for task in completed:
            print(task.result())


if __name__ == '__main__':
    start = time.time()
    asyncio.run(main())
    end = time.time()
    print(f'Total time: {end - start:.2f} seconds')

4.2.4 示例四:完整对比线程池与进程池

python
import concurrent.futures
import time

import requests


def fetch_url(url):
    print(f'开始下载 {url}')
    response = requests.get(url, timeout=10)
    return (f'{url}: 状态码 {response.status_code}, '
            f'内容长度 {len(response.text)} 字节')


urls = [
    'https://www.python.org',
    'https://www.github.com',
    'https://www.stackoverflow.com',
    'https://www.google.com',
    'https://www.bing.com',
]


def run_with_threadpool():
    print('使用线程池执行:')
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        future_to_url = {executor.submit(fetch_url, url): url for url in urls}
        for future in concurrent.futures.as_completed(future_to_url):
            url = future_to_url[future]
            try:
                print(future.result())
            except Exception as exc:
                print(f'{url} 生成了一个异常: {exc}')


def run_with_processpool():
    print('\n使用进程池执行:')
    with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
        results = executor.map(fetch_url, urls)
        for result in results:
            print(result)


if __name__ == '__main__':
    start_time = time.time()
    run_with_threadpool()
    run_with_processpool()
    end_time = time.time()
    print(f'\n总执行时间: {end_time - start_time:.2f} 秒')

5. 多进程间共享状态

multiprocessing 提供了两种共享状态的方式:共享内存(Shared Memory)与服务进程(Server Process)。

5.1 共享内存

共享内存是高效的进程间通信方式。multiprocessing 中的 ValueArray 类可用于创建共享内存。

5.1.1 Value

python
import multiprocessing


def worker1(n):
    n.value += 1
    print(f'worker1: {n.value}')


def worker2(n):
    n.value += 1
    print(f'worker2: {n.value}')


if __name__ == '__main__':
    n = multiprocessing.Value('i', 0)
    p1 = multiprocessing.Process(target=worker1, args=(n,))
    p2 = multiprocessing.Process(target=worker2, args=(n,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

5.1.2 Array

python
from multiprocessing import Process, Value, Array


def modify_shared(n, arr):
    n.value = 3.1415927
    for i in range(len(arr)):
        arr[i] = -arr[i]


if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=modify_shared, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

运行结果:

text
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

更复杂的共享内存示例:

python
import time
from multiprocessing import Array, Process, Value


def worker1(shared_value, shared_array):
    print(f'工作进程1: 初始共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')
    shared_value.value += 1
    for i in range(len(shared_array)):
        shared_array[i] += 1
    print(f'工作进程1: 修改后共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')
    time.sleep(1)


def worker2(shared_value, shared_array):
    print(f'工作进程2: 初始共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')
    shared_value.value *= 2
    for i in range(len(shared_array)):
        shared_array[i] *= 2
    print(f'工作进程2: 修改后共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')
    time.sleep(1)


if __name__ == '__main__':
    shared_value = Value('i', 0)
    shared_array = Array('i', [1, 2, 3, 4, 5])

    p1 = Process(target=worker1, args=(shared_value, shared_array))
    p2 = Process(target=worker2, args=(shared_value, shared_array))

    p1.start()
    p2.start()
    time.sleep(0.5)

    print(f'主进程: 共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')

    p1.join()
    p2.join()

    print(f'最终结果: 共享值 = {shared_value.value}, '
          f'共享数组 = {list(shared_array)}')

注意ValueArray 只能存放 ctypes 基础类型,无法直接存放 Python 对象。如需共享复杂对象,可以先用 pickle 序列化为字节数组再放入 Value,但需承担序列化开销与双份内存开销。

5.2 服务进程(Server Process)

服务进程通过 multiprocessing.Manager() 启动一个独立的管理进程,所有共享对象实际存放在该进程中,其他进程通过 Proxy 对象进行操作。

python
import multiprocessing
import time


def worker1(shared_list, shared_dict):
    print(f'工作进程1的 ID: {multiprocessing.current_process().pid}')
    shared_list.append('来自进程1的数据')
    shared_dict[1] = '进程1的值'
    time.sleep(2)


def worker2(shared_list, shared_dict):
    print(f'工作进程2的 ID: {multiprocessing.current_process().pid}')
    shared_list.extend(['来自进程2的数据1', '来自进程2的数据2'])
    shared_dict[2] = '进程2的值'
    time.sleep(1)


if __name__ == '__main__':
    with multiprocessing.Manager() as manager:
        shared_list = manager.list()
        shared_dict = manager.dict()

        p1 = multiprocessing.Process(target=worker1, args=(shared_list, shared_dict))
        p2 = multiprocessing.Process(target=worker2, args=(shared_list, shared_dict))

        print(f'主进程 ID: {multiprocessing.current_process().pid}')

        p1.start()
        p2.start()
        p1.join()
        p2.join()

        print(f'共享列表: {list(shared_list)}')
        print(f'共享字典: {dict(shared_dict)}')

运行结果:

text
主进程 ID: 33464
工作进程1的 ID: 33468
工作进程2的 ID: 33469
共享列表: ['来自进程1的数据', '来自进程2的数据1', '来自进程2的数据2']
共享字典: {1: '进程1的值', 2: '进程2的值'}

multiprocessing.Queue()manager.Queue() 底层实现不同:前者基于共享内存与管道,后者通过服务进程代理,适用于更复杂的共享需求。

6. 高级并发技巧

6.1 多进程同步与协调

选择合适的同步机制:

  • Semaphore:限制同时访问某资源的进程数量。
  • Lock:确保对共享资源的互斥访问。
  • Event:用于简单的进程间信号通知。
  • Condition:用于基于条件的复杂协调。

6.1.1 信号量(Semaphore)

python
import time
from multiprocessing import Process, Semaphore


def worker(sem, num):
    sem.acquire()
    print(f'工作进程 {num} 开始')
    time.sleep(2)
    print(f'工作进程 {num} 结束')
    sem.release()


if __name__ == '__main__':
    semaphore = Semaphore(3)
    processes = []
    for i in range(5):
        p = Process(target=worker, args=(semaphore, i))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

6.1.2 互斥锁(Lock)

python
import time
from multiprocessing import Lock, Process, Value


def increment(lock, shared_value):
    for _ in range(100):
        time.sleep(0.01)
        with lock:
            shared_value.value += 1


if __name__ == '__main__':
    lock = Lock()
    shared_value = Value('i', 0)
    processes = [
        Process(target=increment, args=(lock, shared_value))
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f'最终值: {shared_value.value}')

运行结果:最终值: 400

6.1.3 事件(Event)

python
import time
from multiprocessing import Event, Process


def waiter(event):
    print('等待事件...')
    event.wait()
    print('事件被设置,继续执行')


def setter(event):
    print('准备设置事件')
    time.sleep(3)
    event.set()
    print('事件已设置')


if __name__ == '__main__':
    event = Event()
    processes = [
        Process(target=waiter, args=(event,)),
        Process(target=setter, args=(event,)),
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

6.1.4 条件变量(Condition)

python
import time
from multiprocessing import Condition, Process, Value


def consumer(cond, shared_queue):
    while True:
        with cond:
            while shared_queue.value == 0:
                print('消费者: 队列为空,等待...')
                cond.wait()
            item = shared_queue.value
            shared_queue.value = 0
            print(f'消费者: 消费了项目 {item}')
            cond.notify()
        time.sleep(1)


def producer(cond, shared_queue):
    for i in range(5):
        with cond:
            while shared_queue.value != 0:
                print('生产者: 队列满了,等待...')
                cond.wait()
            shared_queue.value = i + 1
            print(f'生产者: 生产了项目 {i + 1}')
            cond.notify()
        time.sleep(0.5)


if __name__ == '__main__':
    condition = Condition()
    shared_queue = Value('i', 0)

    cons = Process(target=consumer, args=(condition, shared_queue))
    prod = Process(target=producer, args=(condition, shared_queue))

    cons.start()
    prod.start()
    prod.join()
    cons.terminate()
    cons.join()

6.2 避免 GIL 的影响

GIL 是 CPython 中的机制,确保同一时间只有一个线程执行 Python 字节码。绕过 GIL 的常见方法:

  • 使用多进程代替多线程,每个进程拥有独立的 GIL。
  • 使用 Jython 或 IronPython 等没有 GIL 的 Python 实现。
  • 使用 C/C++ 扩展执行计算密集型任务,释放 GIL。

6.3 资源管理与任务调度

  • 资源管理:使用 with 语句管理进程池、执行器、文件与网络连接,确保资源正确释放。
  • 任务调度:使用 multiprocessing.Queue 实现生产者-消费者模型,由生产者将任务放入队列,消费者从队列取出并执行。
python
import multiprocessing
import time


def worker(x):
    print(f'进程 {multiprocessing.current_process().pid} 正在处理 {x}')
    time.sleep(1)
    return x * x


if __name__ == '__main__':
    print(f'主进程 ID: {multiprocessing.current_process().pid}')

    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(worker, range(10))
        print(f'处理结果: {results}')

    print('所有工作已完成,进程池已关闭')

任务调度示例:

python
import multiprocessing
import random
import time


def producer(queue, num_tasks):
    print(f'生产者进程 {multiprocessing.current_process().name} 开始运行')
    for i in range(num_tasks):
        task = f'任务-{i}'
        queue.put(task)
        print(f'生产者添加任务: {task}')
        time.sleep(random.uniform(0.1, 0.3))

    for _ in range(multiprocessing.cpu_count()):
        queue.put(None)
    print('生产者完成所有任务')


def consumer(queue):
    print(f'消费者进程 {multiprocessing.current_process().name} 开始运行')
    while True:
        task = queue.get()
        if task is None:
            break
        print(f'消费者 {multiprocessing.current_process().name} 执行任务: {task}')
        time.sleep(random.uniform(0.5, 1))
    print(f'消费者 {multiprocessing.current_process().name} 完成工作')


if __name__ == '__main__':
    num_tasks = 10
    task_queue = multiprocessing.Queue()

    producer_process = multiprocessing.Process(
        target=producer, args=(task_queue, num_tasks)
    )

    num_consumers = multiprocessing.cpu_count()
    consumer_processes = [
        multiprocessing.Process(target=consumer, args=(task_queue,))
        for _ in range(num_consumers)
    ]

    producer_process.start()
    for p in consumer_processes:
        p.start()

    producer_process.join()
    for p in consumer_processes:
        p.join()

    print('所有进程已完成')

7. 错误处理与调试

7.1 错误处理策略

  • 进程间通信异常处理:在通信代码中使用 try-except 捕获异常,避免单个进程崩溃导致整个程序退出。
  • 进程池异常处理:使用 PoolExecutor 时注意捕获子进程抛出的异常。
  • 日志记录:使用 logging 模块记录运行时信息、警告与错误,便于排查问题。

7.2 捕获子进程异常

python
import multiprocessing
import queue
import traceback


class SafeProcess(multiprocessing.Process):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._pconn, self._cconn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            super().run()
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception


def worker(queue):
    try:
        queue.put('正常数据')
        raise ValueError('模拟的错误')
    except Exception as e:
        queue.put(('异常', str(e)))


def main():
    queue = multiprocessing.Queue()
    process = SafeProcess(target=worker, args=(queue,))
    process.start()

    while process.is_alive():
        try:
            data = queue.get(timeout=1)
            if isinstance(data, tuple) and data[0] == '异常':
                print(f'工作进程发生异常: {data[1]}')
            else:
                print(f'收到数据: {data}')
        except queue.Empty:
            pass

    process.join()

    if process.exception:
        error, tb = process.exception
        print(f'进程异常: {error}')
        print(f'异常追踪:\n{tb}')


if __name__ == '__main__':
    main()

7.3 使用 loggingtraceback

python
import logging
import traceback

logging.basicConfig(filename='example.log', level=logging.DEBUG)
logging.debug('This is a debug message')
logging.error('This is an error message')

try:
    # 可能会引发异常的代码
    pass
except Exception:
    traceback.print_exc()

总结

  • 多进程适合 CPU 密集型任务,可以绕过 GIL 并充分利用多核 CPU。
  • multiprocessing.ProcessPool 是常用的进程创建与管理工具。
  • QueuePipe、共享内存与 Manager 提供了多种进程间通信与状态共享方式。
  • SemaphoreLockEventCondition 提供了进程间同步机制。
  • 使用 with 语句、logging 与异常处理可以提高多进程程序的稳定性与可维护性。

Maintained by Robin