Python 多进程编程指南
1. 概览
1.1 为什么选择多进程
- 充分利用多核处理器:多进程可以同时利用多个 CPU 核心,实现并行处理,加快任务执行速度。
- 避免 GIL 的影响:Python 的全局解释器锁(GIL)限制了多线程并发执行时的效率,而多进程可以绕过这一限制。
- 提高程序稳定性:由于多进程拥有独立的内存空间,进程之间互不影响,因此在需要隔离环境的任务中更加稳定可靠。
- 适用于 CPU 密集型任务:对于需要大量计算的任务,多进程能够更好地利用计算资源,提高程序执行效率。
1.2 操作系统基础
Unix/Linux 提供了 fork() 系统调用。与普通函数“调用一次、返回一次”不同,fork() 调用一次会返回两次:在父进程中返回子进程的 PID,在子进程中返回 0。子进程可以通过 getppid() 获取父进程 ID。
Python 的 os 模块封装了该系统调用,允许在程序中直接创建子进程:
import os
print(f'Process ({os.getpid()}) start...')
# 仅在 Unix/Linux/macOS 上有效
pid = os.fork()
if pid == 0:
print(f'I am child process ({os.getpid()}), parent is {os.getppid()}.')
else:
print(f'I ({os.getpid()}) just created a child process ({pid}).')运行结果:
Process (876) start...
I (876) just created a child process (877).
I am child process (877), parent is 876.由于 Windows 没有 fork() 调用,上述代码无法在 Windows 上运行。为了使 Python 具备跨平台的多进程能力,标准库提供了 multiprocessing 模块。
2. Multiprocessing 模块
multiprocessing 是 Python 内置的多进程编程模块。通过 Process 类可以创建新进程,通过 Pool 类可以创建进程池实现并行处理,通过 Queue、Pipe 等机制可以实现进程间通信。
2.1 子进程启动方式
Python 3 支持三种启动子进程的方式:spawn、fork、forkserver。
spawn:启动一个全新的 Python 解释器进程,不继承父进程不必要的文件描述符与资源。跨平台兼容性最好,但启动较慢。fork:使用os.fork()创建子进程,继承父进程资源。在 Linux 下通过写时复制(Copy-on-Write)实现,创建成本低。但如果父进程是多线程程序,使用fork可能导致死锁,详见 Python Multiprocessing。forkserver:启动一个独立的Fork Server单线程进程,由它负责调用os.fork()产生子进程,从而避免多线程父进程 fork 导致的问题。
不同操作系统默认启动方式不同。Unix/Linux 通常默认使用 fork,Windows 与 macOS 默认使用 spawn。
可以通过以下两种方式显式指定启动方式:
# 方式一:全局设置
import multiprocessing as mp
if __name__ == '__main__':
mp.set_start_method('spawn')# 方式二:使用上下文对象
import multiprocessing as mp
def foo(queue):
queue.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
queue = ctx.Queue()
process = ctx.Process(target=foo, args=(queue,))
process.start()2.2 创建进程
multiprocessing.Process 类用于创建新进程。每个 Process 实例拥有独立的内存空间。调用 start() 启动进程,调用 join() 等待进程结束。
2.2.1 方式一:传入目标函数
from multiprocessing import Process
import os
def run_proc(name):
print(f'Run child process: {name} ({os.getpid()})...')
if __name__ == '__main__':
print(f'Parent process {os.getpid()}.')
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')运行结果:
Parent process 928.
Child process will start.
Run child process: test (929)...
Child process end.2.2.2 方式二:继承 Process 类
from multiprocessing import Process
class MyProcess(Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f'Test Python Process: {self.name}')
if __name__ == '__main__':
process_list = []
for i in range(5):
p = MyProcess(f'P_{i}')
p.start()
process_list.append(p)
for p in process_list:
p.join()
print('Finished')运行结果:
Test Python Process: P_0
Test Python Process: P_1
Test Python Process: P_2
Test Python Process: P_3
Test Python Process: P_4
Finished2.3 创建进程池
multiprocessing.Pool 用于创建进程池,可方便地管理多个进程。通过 map()、apply()、apply_async() 等方法将任务分配给进程池并行执行。
Pool 默认大小为 CPU 核心数,也可通过 processes 参数自定义。与 Process 不同,提交到 Pool 的任务可以有返回值。
常用方法说明:
map(func, iterable):将可迭代对象中的每个元素传给func,按输入顺序返回结果。apply_async(func, args):异步提交单个任务,返回AsyncResult对象,通过get()获取结果。close():关闭进程池,不再接受新任务。join():等待所有子进程执行完毕,调用前必须先调用close()。
创建 Pool 时还可以使用以下参数:
initializer/initargs:对每个工作进程执行初始化函数。maxtasksperchild:每个工作进程最多处理的任务数,超过后会被新进程替换,适用于存在内存泄漏风险的场景。
2.3.1 apply_async 示例
from multiprocessing import Pool
import os
import time
import random
def worker(name):
print(f'Run task {name} ({os.getpid()})...')
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print(f'Task {name} runs {end - start:.2f} seconds.')
if __name__ == '__main__':
print(f'Parent process {os.getpid()}.')
p = Pool(processes=4)
for i in range(5):
p.apply_async(worker, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')运行结果:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.由于 Pool 大小为 4,最多同时运行 4 个进程;task 4 需要等待某个任务完成后才能开始执行。
2.3.2 map 示例
import multiprocessing as mp
def square(x):
return x ** 2
if __name__ == '__main__':
with mp.Pool(5) as pool:
results = pool.map(square, range(1, 11))
print(results)运行结果:
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100]下面是一个更实用的例子:批量生成图片缩略图。
import os
from concurrent.futures import ProcessPoolExecutor, as_completed
from PIL import Image
def get_image_paths(folder):
return [
os.path.join(folder, filename)
for filename in os.listdir(folder)
if filename.lower().endswith(('.jpeg', '.jpg', '.png'))
]
def create_thumbnail(image_path, save_dir, size=(75, 75)):
with Image.open(image_path) as im:
im.thumbnail(size, Image.LANCZOS)
base, filename = os.path.split(image_path)
save_path = os.path.join(save_dir, filename)
os.makedirs(save_dir, exist_ok=True)
im.save(save_path)
return save_path
def process_images(image_dir, save_dir, size=(75, 75), max_workers=None):
image_paths = get_image_paths(image_dir)
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(create_thumbnail, path, save_dir, size)
for path in image_paths
]
for future in as_completed(futures):
try:
result = future.result()
print(f'Thumbnail created: {result}')
except Exception as e:
print(f'Error processing image: {e}')
if __name__ == '__main__':
root_dir = 'work_dir'
image_dir = os.path.join(root_dir, 'images')
save_dir = os.path.join(root_dir, 'thumb')
image_size = (75, 75)
process_images(image_dir, save_dir, image_size)2.4 进程间通信
multiprocessing 提供了 Queue、Pipe 等方式实现进程间数据交换。
Queue:线程/进程安全的队列,可在多个进程之间安全地传递数据。Pipe:提供一对连接对象,实现两个进程之间的双向通信。pickle:用于序列化 Python 对象,在进程间传输复杂数据结构。
2.4.1 队列(Queue)
Queue 将每个进程的运算结果暂存,待进程结束后再统一取出。由于多进程调用的函数无法直接 return,通常使用 Queue 收集结果。
常用方法:
put(item, block=True, timeout=None):向队列中放入数据。get(block=True, timeout=None):从队列中读取并移除一个元素。
from multiprocessing import Process, Queue
import time
import random
def producer(queue):
print('生产者进程开始')
for i in range(5):
item = random.randint(1, 100)
queue.put(item)
print(f'生产者放入: {item}')
time.sleep(random.random())
queue.put(None) # 发送结束信号
print('生产者进程结束')
def consumer(queue):
print('消费者进程开始')
while True:
item = queue.get()
if item is None:
break
print(f'消费者取出: {item}')
time.sleep(random.random())
print('消费者进程结束')
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print('所有进程结束')运行结果:
生产者进程开始
生产者放入: 72
消费者进程开始
消费者取出: 72
生产者放入: 77
消费者取出: 77
生产者放入: 20
消费者取出: 20
生产者放入: 94
生产者放入: 8
生产者进程结束
消费者取出: 94
消费者取出: 8
消费者进程结束
所有进程结束2.4.2 管道(Pipe)
Pipe() 返回一对连接对象,默认全双工模式,两端均可收发。
from multiprocessing import Process, Pipe
def worker(conn):
print('子进程开始工作')
msg = conn.recv()
print(f'子进程收到消息: {msg}')
result = f'处理结果: {msg.upper()}'
conn.send(result)
print('子进程完成工作')
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print('主进程发送消息: hello world')
parent_conn.send('hello world')
result = parent_conn.recv()
print(f'主进程收到结果: {result}')
p.join()
print('所有进程结束')运行结果:
子进程开始工作
主进程发送消息: hello world
子进程收到消息: hello world
子进程完成工作
主进程收到结果: 处理结果: HELLO WORLD
所有进程结束2.4.3 序列化(pickle)
复杂对象可以通过 pickle 序列化后在进程间传递。注意:pickle 不适合处理不可信来源的数据。
import multiprocessing
import pickle
import random
import time
def producer(queue):
print('生产者进程开始')
for i in range(5):
item = {'id': i, 'value': random.randint(1, 100)}
queue.put(pickle.dumps(item))
print(f'生产者放入: {item}')
time.sleep(random.random())
queue.put(pickle.dumps(None))
print('生产者进程结束')
def consumer(queue):
print('消费者进程开始')
while True:
serialized_item = queue.get()
item = pickle.loads(serialized_item)
if item is None:
break
print(f'消费者取出: {item}')
time.sleep(random.random())
print('消费者进程结束')
if __name__ == '__main__':
ctx = multiprocessing.get_context('spawn')
q = ctx.Queue()
p1 = ctx.Process(target=producer, args=(q,))
p2 = ctx.Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
print('所有进程结束')3. 多进程与多线程
3.1 进程与线程概念
- 进程:程序的一次执行过程,是系统资源分配的基本单位。每个进程拥有独立的内存空间,包括代码段、数据段、堆栈等。进程之间相互独立,通信需要特殊手段。
- 线程:进程中的一个执行流,是 CPU 调度的基本单位。同一进程内的线程共享内存空间,通信更方便,但也更容易相互影响。
Python 的标准库提供了 _thread(低级)与 threading(高级)两个模块,绝大多数情况下使用 threading 即可。
启动一个线程只需将函数传入 Thread 实例并调用 start():
import time
import threading
def loop():
print(f'Thread {threading.current_thread().name} is running...')
n = 0
while n < 5:
n = n + 1
print(f'Thread {threading.current_thread().name} >>> {n}')
time.sleep(1)
print(f'Thread {threading.current_thread().name} ended.')
if __name__ == '__main__':
print(f'Thread {threading.current_thread().name} is running...')
thread = threading.Thread(target=loop, name='LoopThread')
thread.start()
thread.join()
print(f'Thread {threading.current_thread().name} ended.')运行结果:
Thread MainThread is running...
Thread LoopThread is running...
Thread LoopThread >>> 1
Thread LoopThread >>> 2
Thread LoopThread >>> 3
Thread LoopThread >>> 4
Thread LoopThread >>> 5
Thread LoopThread ended.
Thread MainThread ended.3.2 多进程 vs. 多线程性能对比
import multiprocessing as mp
import threading
import time
MAX = 10_000_000
def job(queue):
res = 0
for i in range(MAX):
res += i + i ** 2 + i ** 3
queue.put(res)
def multiprocess():
queue = mp.Queue()
p1 = mp.Process(target=job, args=(queue,))
p2 = mp.Process(target=job, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
res1 = queue.get()
res2 = queue.get()
print('multiprocess:', res1 + res2)
def normal():
res = 0
for _ in range(2):
for i in range(MAX):
res += i + i ** 2 + i ** 3
print('normal:', res)
def multithread():
queue = mp.Queue()
t1 = threading.Thread(target=job, args=(queue,))
t2 = threading.Thread(target=job, args=(queue,))
t1.start()
t2.start()
t1.join()
t2.join()
res1 = queue.get()
res2 = queue.get()
print('multithreading:', res1 + res2)
if __name__ == '__main__':
st = time.time()
normal()
st1 = time.time()
print(f'normal time: {st1 - st:.2f}')
multithread()
st2 = time.time()
print(f'multithreading time: {st2 - st1:.2f}')
multiprocess()
print(f'multiprocess time: {time.time() - st2:.2f}')从结果可以看出,多进程执行时间通常短于多线程与串行执行,而多线程与串行执行时间相近。原因是 CPython 的 GIL 限制了同一时刻只有一个线程执行 Python 字节码,因此 Python 多线程无法发挥多核优势,而多进程可以绕过这一限制。
3.3 进程与线程的区别
| 维度 | 进程 | 线程 |
|---|---|---|
| 资源占用 | 独立内存空间,资源消耗大 | 共享进程内存,轻量 |
| 通信方式 | 需要队列、管道等特殊机制 | 可直接访问共享数据 |
| 并发性 | 可利用多核,实现真正并行 | 受 GIL 限制,无法利用多核 |
| 稳定性 | 一个进程崩溃不影响其他进程 | 一个线程错误可能导致整个进程崩溃 |
| 适用场景 | CPU 密集型任务 | I/O 密集型任务 |
3.4 选择多进程还是多线程
多任务场景下通常采用 Master-Worker 模式:Master 负责分配任务,Worker 负责执行任务。
- 多进程:稳定性高,一个子进程崩溃不会影响主进程与其他子进程;但创建与切换开销较大。适合 CPU 密集型任务。
- 多线程:创建开销小,但受 GIL 限制,且一个线程崩溃可能导致整个进程崩溃。适合 I/O 密集型任务。
现代 Web 服务器常采用多进程 + 多线程的混合模式,以兼顾稳定性与并发效率。
3.4.1 计算密集型 vs. I/O 密集型
- 计算密集型任务:消耗大量 CPU 资源,如视频编解码、科学计算等。Python 作为解释型语言运行效率有限,可结合 C/C++ 扩展或 NumPy 等库优化。
- I/O 密集型任务:大部分时间等待网络、磁盘等 I/O 操作,如 Web 应用、文件下载等。Python 配合异步 I/O 或线程池通常足够高效。
3.4.2 异步 I/O
现代操作系统支持异步 I/O,允许在单进程单线程模型下高效处理多任务,这种模型称为事件驱动模型。Nginx 就是典型的异步 I/O Web 服务器。在多核 CPU 上,可以运行与核心数相等的进程,每个进程内部采用异步 I/O,从而兼顾并发与多核利用。
3.5 分布式进程
相比线程,进程更稳定,且可以分布到多台机器上。multiprocessing.managers 子模块支持将队列等对象暴露到网络上,让其他机器的进程访问,从而方便地构建分布式多进程程序。
3.5.1 服务进程(task_master.py)
# task_master.py
import random
import queue
import time
from multiprocessing.managers import BaseManager
# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
manager = QueueManager(address=('', 5000), authkey=b'abc')
manager.start()
task = manager.get_task_queue()
result = manager.get_result_queue()
for i in range(10):
n = random.randint(0, 10000)
print(f'Put task {n}...')
task.put(n)
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print(f'Result: {r}')
manager.shutdown()
print('master exit.')注意:在分布式多进程环境下,必须通过
manager.get_task_queue()获得的接口添加任务,不能直接操作原始的task_queue,否则会绕过QueueManager的封装。
3.5.2 工作进程(task_worker.py)
# task_worker.py
import queue
import time
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
print(f'Connect to server {server_addr}...')
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = m.get_task_queue()
result = m.get_result_queue()
for i in range(10):
try:
n = task.get(timeout=1)
print(f'Run task {n} * {n}...')
r = f'{n} * {n} = {n * n}'
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty.')
print('worker exit.')4. 进程池与异步编程
4.1 Pool 的使用与优化
- 使用:通过
apply()、map()、starmap()等方法提交任务,配合close()与join()等待完成。 - 优化建议:
- 根据 CPU 核心数与任务特性设置合适的进程数,避免上下文切换开销。
- 尽量减少进程间通信;若任务可并行处理,尽量一次性提交大量任务。
下面是一个优化的 multiprocessing.Pool 使用示例:
import multiprocessing
import os
import time
def cpu_bound_task(n):
"""模拟一个 CPU 密集型任务。"""
count = 0
for i in range(n):
count += i * i
return count
def process_chunk(chunk):
"""处理一个数据块。"""
return [cpu_bound_task(item) for item in chunk]
def main():
num_cores = os.cpu_count()
print(f'本机有 {num_cores} 个 CPU 核心')
num_tasks = 10000
tasks = list(range(num_tasks))
chunk_size = len(tasks) // num_cores
chunks = [tasks[i:i + chunk_size] for i in range(0, len(tasks), chunk_size)]
start_time = time.time()
with multiprocessing.Pool(processes=num_cores) as pool:
results = pool.map(process_chunk, chunks)
final_results = [item for sublist in results for item in sublist]
end_time = time.time()
print(f'处理 {num_tasks} 个任务耗时: {end_time - start_time:.2f} 秒')
print(f'结果数量: {len(final_results)}')
if __name__ == '__main__':
main()运行结果示例:
本机有 8 个 CPU 核心
处理 10000 个任务耗时: 1.82 秒
结果数量: 10000该示例展示了以下优化点:
- 根据 CPU 核心数设置进程池大小,充分利用多核优势。
- 将任务分块处理,减少进程间通信开销。
- 使用
map()自动分配任务并收集结果。 - 使用
with语句确保进程池正确关闭,避免资源泄漏。
4.2 concurrent.futures 简介
concurrent.futures 提供了更现代的异步执行接口,主要包含:
ThreadPoolExecutor:使用线程池执行异步调用,适合 I/O 密集型任务。ProcessPoolExecutor:使用进程池执行异步调用,适合 CPU 密集型任务。Future:表示异步计算的结果。
常用方法:
submit(fn, *args, **kwargs):提交单个任务。map(func, *iterables):并行执行映射。shutdown(wait=True):关闭执行器。
Future 方法:
result(timeout=None):获取结果。exception(timeout=None):获取异常。done()/cancelled()/running():查询任务状态。add_done_callback(fn):添加完成回调。
4.2.1 示例一:分别处理 I/O 与 CPU 密集型任务
import concurrent.futures
import time
import requests
def download_image(url):
response = requests.get(url, timeout=10)
print(f'下载完成: {url}')
return len(response.content)
def cpu_bound_task(n):
count = 0
for i in range(n):
count += i * i
return count
if __name__ == '__main__':
image_urls = [
'https://example.com/image1.jpg',
'https://example.com/image2.jpg',
'https://example.com/image3.jpg',
]
numbers = [1000, 2000, 3000]
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
image_sizes = list(executor.map(download_image, image_urls))
print(f'图片大小: {image_sizes}')
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(cpu_bound_task, numbers))
print(f'计算结果: {results}')
end_time = time.time()
print(f'总耗时: {end_time - start_time:.2f} 秒')4.2.2 示例二:使用 submit 与 as_completed
import concurrent.futures
import time
def task(n):
print(f'开始执行任务 {n}')
time.sleep(n)
return f'任务 {n} 完成'
if __name__ == '__main__':
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(task, i) for i in range(5)]
for future in concurrent.futures.as_completed(futures):
print(future.result())submit() 比 map() 更灵活,可以提交不同函数或参数;as_completed() 按任务完成顺序返回结果,适合处理耗时不同的任务。
4.2.3 示例三:结合 asyncio 与 ProcessPoolExecutor
import asyncio
import concurrent.futures
import time
def io_bound_task(n):
# 模拟耗时任务
time.sleep(1)
return f'Task {n} completed'
async def main():
loop = asyncio.get_event_loop()
with concurrent.futures.ProcessPoolExecutor() as executor:
tasks = [
loop.run_in_executor(executor, io_bound_task, i)
for i in range(10)
]
completed, _ = await asyncio.wait(tasks)
for task in completed:
print(task.result())
if __name__ == '__main__':
start = time.time()
asyncio.run(main())
end = time.time()
print(f'Total time: {end - start:.2f} seconds')4.2.4 示例四:完整对比线程池与进程池
import concurrent.futures
import time
import requests
def fetch_url(url):
print(f'开始下载 {url}')
response = requests.get(url, timeout=10)
return (f'{url}: 状态码 {response.status_code}, '
f'内容长度 {len(response.text)} 字节')
urls = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.google.com',
'https://www.bing.com',
]
def run_with_threadpool():
print('使用线程池执行:')
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
future_to_url = {executor.submit(fetch_url, url): url for url in urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
print(future.result())
except Exception as exc:
print(f'{url} 生成了一个异常: {exc}')
def run_with_processpool():
print('\n使用进程池执行:')
with concurrent.futures.ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(fetch_url, urls)
for result in results:
print(result)
if __name__ == '__main__':
start_time = time.time()
run_with_threadpool()
run_with_processpool()
end_time = time.time()
print(f'\n总执行时间: {end_time - start_time:.2f} 秒')5. 多进程间共享状态
multiprocessing 提供了两种共享状态的方式:共享内存(Shared Memory)与服务进程(Server Process)。
5.1 共享内存
共享内存是高效的进程间通信方式。multiprocessing 中的 Value 与 Array 类可用于创建共享内存。
5.1.1 Value 类
import multiprocessing
def worker1(n):
n.value += 1
print(f'worker1: {n.value}')
def worker2(n):
n.value += 1
print(f'worker2: {n.value}')
if __name__ == '__main__':
n = multiprocessing.Value('i', 0)
p1 = multiprocessing.Process(target=worker1, args=(n,))
p2 = multiprocessing.Process(target=worker2, args=(n,))
p1.start()
p2.start()
p1.join()
p2.join()5.1.2 Array 类
from multiprocessing import Process, Value, Array
def modify_shared(n, arr):
n.value = 3.1415927
for i in range(len(arr)):
arr[i] = -arr[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=modify_shared, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])运行结果:
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]更复杂的共享内存示例:
import time
from multiprocessing import Array, Process, Value
def worker1(shared_value, shared_array):
print(f'工作进程1: 初始共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')
shared_value.value += 1
for i in range(len(shared_array)):
shared_array[i] += 1
print(f'工作进程1: 修改后共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')
time.sleep(1)
def worker2(shared_value, shared_array):
print(f'工作进程2: 初始共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')
shared_value.value *= 2
for i in range(len(shared_array)):
shared_array[i] *= 2
print(f'工作进程2: 修改后共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')
time.sleep(1)
if __name__ == '__main__':
shared_value = Value('i', 0)
shared_array = Array('i', [1, 2, 3, 4, 5])
p1 = Process(target=worker1, args=(shared_value, shared_array))
p2 = Process(target=worker2, args=(shared_value, shared_array))
p1.start()
p2.start()
time.sleep(0.5)
print(f'主进程: 共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')
p1.join()
p2.join()
print(f'最终结果: 共享值 = {shared_value.value}, '
f'共享数组 = {list(shared_array)}')注意:
Value与Array只能存放ctypes基础类型,无法直接存放 Python 对象。如需共享复杂对象,可以先用pickle序列化为字节数组再放入Value,但需承担序列化开销与双份内存开销。
5.2 服务进程(Server Process)
服务进程通过 multiprocessing.Manager() 启动一个独立的管理进程,所有共享对象实际存放在该进程中,其他进程通过 Proxy 对象进行操作。
import multiprocessing
import time
def worker1(shared_list, shared_dict):
print(f'工作进程1的 ID: {multiprocessing.current_process().pid}')
shared_list.append('来自进程1的数据')
shared_dict[1] = '进程1的值'
time.sleep(2)
def worker2(shared_list, shared_dict):
print(f'工作进程2的 ID: {multiprocessing.current_process().pid}')
shared_list.extend(['来自进程2的数据1', '来自进程2的数据2'])
shared_dict[2] = '进程2的值'
time.sleep(1)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
shared_list = manager.list()
shared_dict = manager.dict()
p1 = multiprocessing.Process(target=worker1, args=(shared_list, shared_dict))
p2 = multiprocessing.Process(target=worker2, args=(shared_list, shared_dict))
print(f'主进程 ID: {multiprocessing.current_process().pid}')
p1.start()
p2.start()
p1.join()
p2.join()
print(f'共享列表: {list(shared_list)}')
print(f'共享字典: {dict(shared_dict)}')运行结果:
主进程 ID: 33464
工作进程1的 ID: 33468
工作进程2的 ID: 33469
共享列表: ['来自进程1的数据', '来自进程2的数据1', '来自进程2的数据2']
共享字典: {1: '进程1的值', 2: '进程2的值'}multiprocessing.Queue() 与 manager.Queue() 底层实现不同:前者基于共享内存与管道,后者通过服务进程代理,适用于更复杂的共享需求。
6. 高级并发技巧
6.1 多进程同步与协调
选择合适的同步机制:
Semaphore:限制同时访问某资源的进程数量。Lock:确保对共享资源的互斥访问。Event:用于简单的进程间信号通知。Condition:用于基于条件的复杂协调。
6.1.1 信号量(Semaphore)
import time
from multiprocessing import Process, Semaphore
def worker(sem, num):
sem.acquire()
print(f'工作进程 {num} 开始')
time.sleep(2)
print(f'工作进程 {num} 结束')
sem.release()
if __name__ == '__main__':
semaphore = Semaphore(3)
processes = []
for i in range(5):
p = Process(target=worker, args=(semaphore, i))
processes.append(p)
p.start()
for p in processes:
p.join()6.1.2 互斥锁(Lock)
import time
from multiprocessing import Lock, Process, Value
def increment(lock, shared_value):
for _ in range(100):
time.sleep(0.01)
with lock:
shared_value.value += 1
if __name__ == '__main__':
lock = Lock()
shared_value = Value('i', 0)
processes = [
Process(target=increment, args=(lock, shared_value))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f'最终值: {shared_value.value}')运行结果:最终值: 400
6.1.3 事件(Event)
import time
from multiprocessing import Event, Process
def waiter(event):
print('等待事件...')
event.wait()
print('事件被设置,继续执行')
def setter(event):
print('准备设置事件')
time.sleep(3)
event.set()
print('事件已设置')
if __name__ == '__main__':
event = Event()
processes = [
Process(target=waiter, args=(event,)),
Process(target=setter, args=(event,)),
]
for p in processes:
p.start()
for p in processes:
p.join()6.1.4 条件变量(Condition)
import time
from multiprocessing import Condition, Process, Value
def consumer(cond, shared_queue):
while True:
with cond:
while shared_queue.value == 0:
print('消费者: 队列为空,等待...')
cond.wait()
item = shared_queue.value
shared_queue.value = 0
print(f'消费者: 消费了项目 {item}')
cond.notify()
time.sleep(1)
def producer(cond, shared_queue):
for i in range(5):
with cond:
while shared_queue.value != 0:
print('生产者: 队列满了,等待...')
cond.wait()
shared_queue.value = i + 1
print(f'生产者: 生产了项目 {i + 1}')
cond.notify()
time.sleep(0.5)
if __name__ == '__main__':
condition = Condition()
shared_queue = Value('i', 0)
cons = Process(target=consumer, args=(condition, shared_queue))
prod = Process(target=producer, args=(condition, shared_queue))
cons.start()
prod.start()
prod.join()
cons.terminate()
cons.join()6.2 避免 GIL 的影响
GIL 是 CPython 中的机制,确保同一时间只有一个线程执行 Python 字节码。绕过 GIL 的常见方法:
- 使用多进程代替多线程,每个进程拥有独立的 GIL。
- 使用 Jython 或 IronPython 等没有 GIL 的 Python 实现。
- 使用 C/C++ 扩展执行计算密集型任务,释放 GIL。
6.3 资源管理与任务调度
- 资源管理:使用
with语句管理进程池、执行器、文件与网络连接,确保资源正确释放。 - 任务调度:使用
multiprocessing.Queue实现生产者-消费者模型,由生产者将任务放入队列,消费者从队列取出并执行。
import multiprocessing
import time
def worker(x):
print(f'进程 {multiprocessing.current_process().pid} 正在处理 {x}')
time.sleep(1)
return x * x
if __name__ == '__main__':
print(f'主进程 ID: {multiprocessing.current_process().pid}')
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(worker, range(10))
print(f'处理结果: {results}')
print('所有工作已完成,进程池已关闭')任务调度示例:
import multiprocessing
import random
import time
def producer(queue, num_tasks):
print(f'生产者进程 {multiprocessing.current_process().name} 开始运行')
for i in range(num_tasks):
task = f'任务-{i}'
queue.put(task)
print(f'生产者添加任务: {task}')
time.sleep(random.uniform(0.1, 0.3))
for _ in range(multiprocessing.cpu_count()):
queue.put(None)
print('生产者完成所有任务')
def consumer(queue):
print(f'消费者进程 {multiprocessing.current_process().name} 开始运行')
while True:
task = queue.get()
if task is None:
break
print(f'消费者 {multiprocessing.current_process().name} 执行任务: {task}')
time.sleep(random.uniform(0.5, 1))
print(f'消费者 {multiprocessing.current_process().name} 完成工作')
if __name__ == '__main__':
num_tasks = 10
task_queue = multiprocessing.Queue()
producer_process = multiprocessing.Process(
target=producer, args=(task_queue, num_tasks)
)
num_consumers = multiprocessing.cpu_count()
consumer_processes = [
multiprocessing.Process(target=consumer, args=(task_queue,))
for _ in range(num_consumers)
]
producer_process.start()
for p in consumer_processes:
p.start()
producer_process.join()
for p in consumer_processes:
p.join()
print('所有进程已完成')7. 错误处理与调试
7.1 错误处理策略
- 进程间通信异常处理:在通信代码中使用
try-except捕获异常,避免单个进程崩溃导致整个程序退出。 - 进程池异常处理:使用
Pool或Executor时注意捕获子进程抛出的异常。 - 日志记录:使用
logging模块记录运行时信息、警告与错误,便于排查问题。
7.2 捕获子进程异常
import multiprocessing
import queue
import traceback
class SafeProcess(multiprocessing.Process):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
super().run()
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
def worker(queue):
try:
queue.put('正常数据')
raise ValueError('模拟的错误')
except Exception as e:
queue.put(('异常', str(e)))
def main():
queue = multiprocessing.Queue()
process = SafeProcess(target=worker, args=(queue,))
process.start()
while process.is_alive():
try:
data = queue.get(timeout=1)
if isinstance(data, tuple) and data[0] == '异常':
print(f'工作进程发生异常: {data[1]}')
else:
print(f'收到数据: {data}')
except queue.Empty:
pass
process.join()
if process.exception:
error, tb = process.exception
print(f'进程异常: {error}')
print(f'异常追踪:\n{tb}')
if __name__ == '__main__':
main()7.3 使用 logging 与 traceback
import logging
import traceback
logging.basicConfig(filename='example.log', level=logging.DEBUG)
logging.debug('This is a debug message')
logging.error('This is an error message')
try:
# 可能会引发异常的代码
pass
except Exception:
traceback.print_exc()总结
- 多进程适合 CPU 密集型任务,可以绕过 GIL 并充分利用多核 CPU。
multiprocessing.Process与Pool是常用的进程创建与管理工具。Queue、Pipe、共享内存与Manager提供了多种进程间通信与状态共享方式。Semaphore、Lock、Event、Condition提供了进程间同步机制。- 使用
with语句、logging与异常处理可以提高多进程程序的稳定性与可维护性。