Skip to content

veRL 单控制器设计详解

前言

本文档旨在为参与开发verl项目的开发者提供关于verl.single_controller模块的深入理解。它特别适用于希望了解或参与到该模块开发中的开源贡献者,而非终端用户。文档的核心目标是阐明架构原理及其内部工作机制。

1. 起源

single_controller模块的设计初衷是为了将单进程强化学习(RLHF)实验脚本转化为分布式系统,同时尽可能减少代码改动并保持调试的便捷性。

传统解决方案如使用 PyTorch 的分布式数据并行(DDP)通常需要封装nn.Module并在多进程中执行相同的函数。然而,在分布式 RLHF 环境中,这种方法面临两大挑战:难以表达 PPO 算法所需的复杂 DAG 结构,以及在训练过程中难以检查中间张量。

为了维持良好的可调试性,我们采取了不同的策略——将训练循环划分为明确的阶段,例如generate_sequencescompute_advantages等。

选择Ray作为verl的初始后端,主要是因为它能够将 Python 类方法暴露为 RPC 端点。尽管 Ray 默认支持单方法调用对应单次 RPC 的模型,但大型语言模型(LLMs)的训练通常要求多进程协作。为此,我们引入了以下组件以隐藏这种复杂性:

  • WorkerGroup:管理远程工作节点组,提供统一的多进程分布式计算接口;
  • ResourcePool:将计算资源绑定到工作进程;
  • ClassWithArgs:支持带初始化参数的延迟远程实例化。

2. 运行示例:generate_sequences

我们将通过generate_sequences阶段中ActorRolloutRefWorker类的方法演示如何在分布式工作节点间完成注册与调用。

2.1 第一步:使用装饰器注册

首先定义generate_sequences方法,并使用@register装饰器进行标记,以便在驱动脚本中被调用。

源码文件:fsdp_workers.py

python
class ActorRolloutRefWorker(Worker):
    ...
    @register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
    def generate_sequences(self, prompts: DataProto):
        prompts = prompts.to(torch.cuda.current_device())
        ...

@register装饰器为generate_sequences方法添加元数据,虽然当前实现不改变其功能逻辑,但它会通过特定键值(MAGIC_ATTR)附加属性字段。

来源:decorator.py

python
def register(dispatch_mode=Dispatch.ALL_TO_ALL, execute_mode=Execute.ALL, blocking=True, materialize_futures=True):
    ...
    def decorator(func):
        @wraps(func)
        def inner(*args, **kwargs):
            if materialize_futures:
                args, kwargs = _materialize_futures(*args, **kwargs)
            return func(*args, **kwargs)

        attrs = {"dispatch_mode": dispatch_mode, "execute_mode": execute_mode, "blocking": blocking}
        setattr(inner, MAGIC_ATTR, attrs)
        return inner

    return decorator

上述代码展示了如何将dispatch_modeexecute_modeblocking等参数值附加到generate_sequences方法上。

register 函数是一个装饰器,用于为分布式计算方法添加元数据配置。 它允许开发者指定方法在分布式环境中的执行模式、数据分发策略和阻塞行为。

  • dispatch_mode: 数据分发模式,默认为 Dispatch.ALL_TO_ALL,控制数据如何分发到各个 Worker
  • execute_mode: 执行模式,默认为 Execute.ALL,控制方法在哪些 Worker 上执行
  • blocking: 是否阻塞执行,默认为 True,控制是否等待远程执行完成
  • materialize_futures: 是否物化 Future 对象,默认为 True,在执行前解析异步对象

2.1.1 装饰器实现 decorator.py:509-527

装饰器的核心逻辑包括:

  1. 同步函数包装器(第 510-514 行): 创建 inner 函数处理同步方法调用
  2. 异步函数包装器(第 516-520 行): 创建 async_inner 函数处理异步方法调用
  3. 函数类型检测(第 522 行): 使用 inspect.iscoroutinefunction() 判断原函数是否为协程
  4. 元数据附加(第 523-524 行): 将配置参数作为属性附加到包装函数上

2.1.2 魔法属性机制 decorator.py:22-23

使用 MAGIC_ATTR = "attrs_3141562937" 作为特殊属性名,避免与用户定义的属性冲突。装饰器将配置信息存储在这个属性中。

2.1.3 技术要点

2.1.4 装饰器模式和元编程

register 函数采用装饰器模式,通过元编程技术在运行时为方法添加分布式执行能力。它不改变原函数的核心逻辑,而是添加元数据供后续的方法绑定过程使用。

2.1.5 Future 对象物化机制 decorator.py:470-482

_materialize_futures 函数处理 DataProtoFuture 对象的物化,确保在分发数据前所有异步对象都已解析完成。

2.1.6 分发模式系统 decorator.py:26-53

支持多种预定义的分发模式,如 DP_COMPUTE_PROTO(数据并行计算)、MEGATRON_COMPUTE(Megatron 3D 并行)等,每种模式对应不同的数据分发和收集策略。

2.1.7 方法绑定集成

装饰器的元数据会在 WorkerGroup 初始化时被提取和使用。 decorator.py:421-422 通过 get_predefined_dispatch_fn 函数获取对应的分发和收集函数。

2.2 第二步:初始化时绑定

当封装在 RayClassWithInitArgs 中的 ActorRolloutRefWorker 被传递给 RayWorkerGroup 时,这些附加属性会被提取并利用。

源码文件:main_generation.py

python
ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(ActorRolloutRefWorker), config=config, role="rollout")

resource_pool = RayResourcePool(process_on_nodes=[config.trainer.n_gpus_per_node] * config.trainer.nnodes)

wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init)

RayWorkerGroup初始化过程 中,会执行两个关键步骤:

  1. 创建工作节点实例(Ray actors): RayWorkerGroup._init_with_resource_pool
  2. 将带有 @register 装饰器的方法绑定到 RayWorkerGroupRayWorkerGroup._bind_worker_method

initialization_and_binding_of_worker_group

initialization_and_binding_of_worker_group

WorkerGroup 的初始化与绑定

绑定过程是 verl.single_controller 的核心所在。

关键函数:WorkerGroup._bind_worker_method

python
def _bind_worker_method(self, user_defined_cls, func_generator):
    ...
    for method_name in dir(user_defined_cls):
        try:
            method = getattr(user_defined_cls, method_name)
            assert callable(method)
        except Exception:
            continue  # Skip properties

当方法具有 MAGIC_ATTR 属性时,@register 装饰器设置的属性将被提取:

python
if hasattr(method, MAGIC_ATTR):
    attribute = getattr(method, MAGIC_ATTR)
    dispatch_mode = attribute["dispatch_mode"]
    execute_mode = attribute["execute_mode"]
    blocking = attribute["blocking"]

如上流程图所示,这些属性会被输入到 func_generator 中。但 func_generator 需要接收 method_namedispatch_fncollect_fnexecute_fnblocking 参数。我们需要从 DISPATCH_MODE_FN_REGISTRY 中根据 dispatch_modeDP_COMPUTE_PROTO)查找对应的 dispatch_fncollect_fn

python
DISPATCH_MODE_FN_REGISTRY = {
    Dispatch.ONE_TO_ALL: {
        "dispatch_fn": dispatch_one_to_all,
        "collect_fn": collect_all_to_all,
    },
    ...
    Dispatch.DP_COMPUTE_PROTO: {
        "dispatch_fn": dispatch_dp_compute_data_proto,
        "collect_fn": collect_dp_compute_data_proto,
    },
    ...
}

同理,execute_fnexecute_mode 选择并通过以下方式提取:

python
# get execute_fn_name
execute_mode = get_predefined_execute_fn(execute_mode=execute_mode)
wg_execute_fn_name = execute_mode["execute_fn_name"]

# get execute_fn from string
try:
    execute_fn = getattr(self, wg_execute_fn_name)
    assert callable(execute_fn), "execute_fn must be callable"
except Exception:
    print(f"execute_fn {wg_execute_fn_name} is invalid")
    raise

在此 generate_sequences 案例中:

  • dispatch_mode = Dispatch.DP_COMPUTE_PROTO
  • dispatch_fn = dispatch_dp_compute_data_proto
  • collect_fn = collect_dp_compute_data_proto
  • execute_fn = RayWorkerGroup.execute_all

2.2.1 ONE_TO_ALL 对比 DP_COMPUTE_PROTO

dispatch_mode 关联着一个 dispatch_fncollect_fn。顾名思义,dispatch_fn 处理 WorkerGroup 中的输入参数并生成批量(列表)输入参数,每个参数将被传递给附加的工作线程 WorkerGroup(工作群组)。

dispatch_fn(分发函数)在 ONE_TO_ALL(一对多)模式下的实现是 dispatch_one_to_all(一对多分发),该函数简单地将所有输入参数复制为 N 份副本,其中 N 等于附加到 worker_group(工作群组)的 Worker 数量:

python
def dispatch_one_to_all(worker_group, *args, **kwargs):
    args = tuple([arg] * worker_group.world_size for arg in args)
    kwargs = {k: [v] * worker_group.world_size for k, v in kwargs.items()}
    return args, kwargs

dispatch_fnDP_COMPUTE_PROTOdispatch_dp_compute_data_proto,它使用 DataProto.chunk 将大型 DataProto 分割为 N 个较小的 DataProto,其中 N 等于 worker_groupworld_size(工作节点数量):

python
def dispatch_dp_compute_data_proto(worker_group, *args, **kwargs):
    from verl.single_controller.base.worker_group import WorkerGroup

    assert isinstance(worker_group, WorkerGroup)
    # Note: enable auto padding for dp compute DatapProto
    splitted_args, splitted_kwargs = _split_args_kwargs_data_proto_with_auto_padding(
        worker_group.world_size,
        *args,
        **kwargs,
    )
    return splitted_args, splitted_kwargs

collect_fn 遵循相同模式,处理来自 WorkerGroup 所有工作节点返回值的批次(列表),并将其合并为一个列表(如 collect_all_to_all 所做)或一个大型 DataProto 数据原型,如同 collect_dp_compute_data_proto 的处理方式。

最终,通过 func_generator 动态生成一个新方法并将其添加到 WorkerGroup 实例中:

python
# bind a new method to the RayWorkerGroup
func = func_generator(
    self,
    method_name,
    dispatch_fn=dispatch_fn,
    collect_fn=collect_fn,
    execute_fn=execute_fn,
    blocking=blocking,
)

try:
    setattr(self, method_name, func)
    method_names.append(method_name)
except Exception as e:
    raise ValueError(f"Fail to set method_name {method_name}") from e

这使得该方法可通过 WorkerGroup 接口调用。

2.3 步骤 3:调用链

所有上述机制共同作用,确保了分布式调用与单进程调用体验的一致性。原本的单进程脚本如下所示:

python
rollout = Rollout()
rollout.generate_sequences(batch)

而在使用verl之后,多进程程序变为:

python
rollout = RayWorkerGroup(resource_pool=[4], RayClassWithArgs(Rollout))
rollout.generate_sequences(batch)

call_chain_of_generate_sequences

在这个简单调用背后,

  • dispatch_fn负责将输入分发给各个工作节点,
  • execute_fn执行实际的远程调用,
  • collect_fn则负责收集结果。

这一切都被抽象封装起来,使得开发者只需对现有逻辑做最小改动即可编写高效的分布式代码。

3. 泛化性

verl.single_controller模块的应用范围远不止于强化学习领域。它提供了批处理远程方法调用的清晰抽象层,并自动处理输入输出。通过缩小单进程与多进程脚本之间的差异,verl.single_controller为更广泛领域的分布式计算开辟了新的可能性。我们期待这个设计能激发社区贡献更多应用案例和扩展方案。


4. 附录: single_controller 源码逐段解析

5. 整体概述

verl/single_controller/base/decorator.py 文件是 veRL 的 HybridFlow 架构中的核心组件,主要负责实现分布式计算的方法注册和调度机制。它实现了控制流(单进程)和计算流(分布式进程)的分离。通过这种设计,开发者可以编写看起来像单进程的代码,但实际上会在多个分布式 worker 上执行,大大简化了分布式 RLHF 训练的复杂性。该系统的灵活性还体现在支持多种不同的数据分发策略,可以适应不同的并行化需求。

核心能力:

  • 在调用前把输入数据 dispatch 给不同进程;
  • 在调用后把各进程的输出 collect 回来;
  • 支持多种分发策略(Megatron、Data-Parallel、All-to-All 等);
  • 支持同步 / 异步、支持自动 padding、支持 DataProto/DataProtoFuture。

6. 逐行 / 逐段解析

  1. import 与常量

    • MAGIC_ATTR 是一个极长字符串,用来在装饰后的函数上保存配置,避免与用户属性冲突。
  2. 两个 Enum 类

    • DispatchExecute 继承自内部类 DynamicEnum(可以动态注册新值)。
    • 初始化函数把预设常量注册进去,如 RANK_ZEROMEGATRON_COMPUTE
  3. 数据分割工具

    • _split_args_kwargs_data_proto:把 DataProtoDataProtoFuture 按 chunk 数切分。
    • _split_args_kwargs_data_proto_with_auto_padding:同上,但遇到长度不能被 chunk 整除时自动 pad。
      • 使用 nonlocal 在内部函数 _padding_and_split_data 里共享 data_proto_lenpadding_size
  4. 具体分发 / 收集策略 每个策略都是一对函数:dispatch_* 负责把输入数据映射到不同 rank;collect_* 负责把输出收集回来。

    • dispatch_one_to_all: rank0 的数据复制给所有 rank。
    • dispatch_all_to_all:什么都不做,每个 rank 拿自己那份。
    • dispatch_megatron_compute:根据 Megatron 的 dp_rank 把数据重新映射到 TP/PP rank。
    • collect_megatron_compute:只拿 TP=0、PP=last、CP=0 的 rank 的结果,避免冗余。
    • dispatch_megatron_pp_as_dp:把 pipeline-stage 看成额外的 DP 维度。
    • dispatch_dp_compute/collect_dp_compute:纯数据并行,每个 rank 拿自己那份,收集时直接返回列表。
    • DataProto 版本:在普通版本之前先对 DataProto 做 chunk 或 auto-padding。
    • DIRECT_ROLLOUT_METHOD:占位符,触发 NotImplementedError
  5. 全局注册表

    • DISPATCH_MODE_FN_REGISTRY 把上面每一对函数映射到对应的 Dispatch 枚举值。
    • register_dispatch_mode / update_dispatch_mode 供用户动态增改策略。
  6. 执行模式

  • Execute.ALLExecute.RANK_ZERO 仅记录一个名字,真正执行逻辑留给 WorkerGroup 的实现。
  1. 装饰器 @register(...)

    • 参数:

      • dispatch_mode:指定分发策略;
      • execute_mode:指定执行策略;
      • blocking:是否阻塞;
      • materialize_futures:是否在分发前把 DataProtoFuture 转成真实数据。
    • 内部使用 inspect.iscoroutinefunction 区分同步 / 异步函数,生成对应的包装器 inner / async_inner

    • 把配置塞进 MAGIC_ATTR,供后续 WorkerGroup 读取。

7. 核心功能

7.1 @register 装饰器

这个文件的核心是 @register 装饰器,它用于标记 Worker 类中需要进行分布式执行的方法。 1

装饰器通过 MAGIC_ATTR 这个魔术属性将元数据附加到被装饰的方法上,包括:

  • dispatch_mode: 数据分发模式
  • execute_mode: 执行模式
  • blocking: 是否阻塞执行 2

函数装饰器@wraps 保留原函数元数据,支持同步 / 异步双形态。

7.2 Dispatch 枚举类

文件中定义了 Dispatch 枚举,它是一个动态枚举类,包含多种数据分发模式:

  • ONE_TO_ALL: 将相同的数据广播到所有 worker,主要用于初始化操作
  • DP_COMPUTE_PROTO: 用于数据并行计算,将 DataProto 对象分割到不同的 worker
  • MEGATRON_COMPUTE_PROTO: 处理 Megatron-LM 的 3D 并行(张量、流水线、数据并行) 3

7.3 分发函数注册表

DISPATCH_MODE_FN_REGISTRY 是一个核心的注册表,将每个分发模式映射到对应的分发函数(dispatch_fn)和收集函数(collect_fn): 4

7.3.1 ONE_TO_ALL 模式的实现

  • dispatch_one_to_all: 将输入参数复制 N 份,其中 N 等于 worker 的数量
  • collect_all_to_all: 收集所有 worker 的返回值 5

7.3.2 DP_COMPUTE_PROTO 模式的实现

  • dispatch_dp_compute_data_proto: 使用 DataProto.chunk 将大的 DataProto 分割成 N 个小的 DataProto
  • collect_dp_compute_data_proto: 将多个 worker 的 DataProto 结果合并 6

7.4 动态方法绑定

当 WorkerGroup 初始化时,会扫描所有带有 MAGIC_ATTR 的方法,提取装饰器设置的属性,并从注册表中获取对应的分发和收集函数。 7

然后使用 func_generator 动态生成新的方法并绑定到 WorkerGroup 实例上,使得分布式调用看起来就像单进程调用一样。 8

7.5 扩展性支持

该文件还提供了注册自定义分发模式的功能:

  • register_dispatch_mode: 注册新的分发模式
  • update_dispatch_mode: 更新现有的分发模式 9

8. worker 类代码分析

8.1 功能概述

verl/single_controller/base/worker.py 中的 Worker 类是 veRL 框架中分布式训练系统的核心基础组件。它作为 HybridFlow 架构中计算流(computation flow)的基础构建块,它管理分布式训练中每个 Worker 的初始化、环境配置、设备管理和节点间通信。该类通过 Ray 框架实现分布式部署,并提供了统一的接口来处理多节点训练场景。

该类的主要功能包括:

  1. 分布式环境初始化:设置分布式训练所需的环境变量,包括 rank、local_rank、world_size 等参数
  2. 设备配置管理:处理 GPU 可见性环境变量(CUDA_VISIBLE_DEVICES、HIP_VISIBLE_DEVICES 等)
  3. Worker 协调:为不同类型的 Worker (Actor、Critic、RewardModel 等)提供统一的基础框架
  4. 方法注册与分发:通过 @register 装饰器系统支持方法的自动分发和结果收集

8.2 逐行/逐段解析

8.2.1 导入和数据类定义 1

这部分定义了两个重要的数据类:

  • DistRankInfo: 存储分布式训练中的各种并行维度的 rank 信息(张量并行、数据并行、流水线并行、上下文并行)
  • DistGlobalInfo: 存储对应的全局大小信息

8.2.2 WorkerHelper 辅助类 2

WorkerHelper 提供了获取节点信息的静态方法:

  • _get_node_ip(): 通过 Ray 获取当前节点的 IP 地址
  • _get_free_port(): 使用 socket 获取一个可用的端口号
  • get_availale_master_addr_port(): 组合上述两个方法,为主节点通信提供地址和端口

8.2.3 Worker 类的 new 方法 3

这是一个关键的初始化控制机制:

  • 检查 DISABLE_WORKER_INIT 环境变量,如果设置为 1 则跳过初始化
  • 获取 RANKWG_PREFIX 环境变量
  • 避免在 Ray 装饰器应用时执行配置(通过检查类名是否包含 "ActorClass(")
  • 如果条件满足,调用 _configure_before_init 进行预配置

8.2.4 预配置方法 4

_configure_before_init 方法处理分布式训练的协调设置:

8.2.5 Rank 0 节点(主节点)

  • 获取可用的主地址和端口
  • 创建包含 MASTER_ADDRMASTER_PORT 的信息字典
  • 如果使用 Ray 后端,创建注册中心 actor 用于节点间协调
  • 将主节点信息设置到环境变量中

8.2.6 其他节点

  • 通过 Ray 获取已存在的注册中心 actor
  • 向注册中心注册自己的节点信息,包括 rank 和节点 ID

8.2.7 环境变量配置 5

env_keys 类方法定义了 Worker 需要的所有环境变量:

  • WORLD_SIZE: 总的节点数量
  • RANK: 当前节点的全局排名
  • LOCAL_WORLD_SIZE: 本地节点数量
  • LOCAL_RANK: 本地排名
  • MASTER_ADDRMASTER_PORT: 主节点通信地址
  • 设备可见性变量(CUDA/HIP/ROCR_VISIBLE_DEVICES)

8.2.8 Worker 初始化 6

__init__ 方法完成实际的初始化工作:

  • 调用 _setup_env_cuda_visible_devices() 配置 GPU 设备
  • 从环境变量中读取分布式训练参数
  • 创建存储字典保存所有配置信息
  • 调用 _configure_with_store 应用配置
  • 初始化 fused_worker_dict 用于存储融合的 Worker

8.2.9 GPU 设备配置 7

_setup_env_cuda_visible_devices 方法处理复杂的 GPU 设备配置:

设备环境变量统一

  • 检查 HIP_VISIBLE_DEVICESCUDA_VISIBLE_DEVICES 的一致性
  • HIP_VISIBLE_DEVICES 转换为 CUDA_VISIBLE_DEVICES 以保持一致性
  • 处理 ROCR_VISIBLE_DEVICES,避免与其他设备变量冲突

Ray 设备管理

  • 检查 Ray 是否设置了 RAY_EXPERIMENTAL_NOSET_*_VISIBLE_DEVICES 标志
  • 如果设置了该标志,从 RAY_LOCAL_RANK 设置本地排名
  • 调用 get_torch_device().set_device() 设置当前设备

8.2.10 配置应用和属性访问 8

_configure_with_store 方法将配置字典应用到实例和环境变量中:

  • 更新实例的 __dict__ 属性
  • 将配置值设置到对应的环境变量中
  • 设置 Redis 服务器主机地址

属性访问方法提供了便捷的接口:

  • world_sizerank 属性
  • get_master_addr_port()get_cuda_visible_devices() 方法

8.2.11 分布式执行方法 9

Worker 类提供了两个使用 @register 装饰器的分布式执行方法:

  • execute_with_func_generator: 使用 DP_COMPUTE_PROTO_WITH_FUNC 模式执行函数
  • execute_func_rank_zero: 使用 ALL_TO_ALL 分发模式但只在 rank 0 执行

8.3 技术要点

  1. 元类编程:通过重写 __new__ 方法控制实例创建过程
  2. 环境变量管理:统一处理多种 GPU 后端的设备可见性配置
  3. 分布式协调:使用 Ray actor 作为注册中心实现节点间信息共享
  4. 装饰器模式:通过 @register 装饰器实现方法的分布式执行
  5. 配置注入:通过字典更新实例属性的动态配置机制

9. worker_group 代码分析

9.1 整体概述

verl/single_controller/base/worker_group.py 文件的WorkerGroup 类是 veRL Single-Controller 架构的核心抽象,负责管理一组分布式 Worker 并提供统一的接口来执行分布式计算。它实现了控制流(单进程)与计算流(多进程)分离的设计理念,使得开发者可以像调用本地方法一样调用分布式方法。

9.2 逐行/逐段解析

9.2.1 辅助类和工具函数 1

ClassWithInitArgs 是一个包装类,用于存储类构造函数的参数,支持延迟实例化。这在远程类实例化场景中特别有用,因为实际的构造需要在不同的时间或位置发生。

  • cls: 要实例化的类
  • argskwargs: 类构造函数的参数
  • fused_worker_used: 标记是否使用融合 Worker
  • __call__ 方法: 使用存储的参数实例化类 2

check_workers_alive 函数持续监控工作进程的存活状态,如果任何 Worker 死亡,会发送 SIGABRT 信号给主线程。这是一个重要的容错机制。

9.2.2 WorkerGroup 核心类 3

WorkerGroup 类的初始化方法设置了基本的属性:

  • _is_init_with_detached_workers: 标记是否使用分离的 Worker
  • fused_worker_used: 是否使用融合 Worker
  • _procecss_dispatch_config: 进程分发配置
  • _workers_worker_names: 存储 Worker 实例和名称
  • _master_addr_master_port: 主节点通信地址
  • _checker_thread: Worker 存活检查线程

9.2.3 Worker 管理方法 4

这些方法提供了 Worker 的生命周期管理:

  • _is_worker_alive: 抽象方法,需要在派生类中实现
  • _block_until_all_workers_alive: 阻塞直到所有 Worker 都存活
  • start_worker_aliveness_check: 启动后台线程监控 Worker 存活状态

9.2.4 方法绑定机制 5

_bind_worker_method 是 WorkerGroup 的核心方法,它实现了方法绑定机制:

  1. 方法扫描: 遍历用户定义类的所有方法
  2. 装饰器检测: 检查方法是否有 MAGIC_ATTR 属性(由 @register 装饰器添加)
  3. 属性提取: 从装饰器中提取 dispatch_modeexecute_modeblocking 属性
  4. 函数映射: 根据 dispatch_mode 获取对应的分发函数和收集函数
  5. 方法生成: 使用 func_generator 动态生成新方法
  6. 方法绑定: 将生成的方法绑定到 WorkerGroup 实例

9.3 技术要点

9.3.1 装饰器模式与元编程

WorkerGroup 使用装饰器模式来标记需要分布式执行的方法。通过 MAGIC_ATTR 属性,系统可以在运行时识别和处理这些方法。

9.3.2 动态方法绑定

使用 setattr 动态地将生成的方法绑定到 WorkerGroup 实例上,这使得分布式调用看起来就像本地方法调用一样。

9.3.3 策略模式

不同的 dispatch_mode 对应不同的数据分发策略:

  • ONE_TO_ALL: 广播模式
  • DP_COMPUTE_PROTO: 数据并行模式
  • ALL_TO_ALL: 直接传递模式

9.3.4 线程安全与并发

使用后台线程监控 Worker 存活状态,确保系统的健壮性。

9.3.5 抽象基类设计

WorkerGroup 作为抽象基类,定义了通用接口,具体实现(如 RayWorkerGroup)提供特定后端的实现。

Maintained by Robin