3. stage2 - 初始化¶
从入口那里已经看到,当 stage 1,2 时,
会创建优化器 DeepSpeedZeroOptimizer
代替原来的优化器,
stage 1,2 的特性都在这个优化器中实现。
Stage 1,2 的核心功能就是对参数的梯度和优化器的状态进行分割, 每个进程(GPU,rank)只保留一部分,减少对显存的消耗。 新版本中,这部分也支持 cpu offload 的功能。
核心思路也简单,就是对基础(原始)优化器内的 params_group
进行处理,
只保留属于当前进程(GPU,rank)的参数,其它的都从优化器中删除,
这样就只会计算保留部分的梯度,以及只有保留部分的优化器状态。
对于优化器的分割初始化功能的实现都写在类的 __init__
方法里了,
省略一些吐槽。导致这个 __init__
方法代码很长,我们把它分拆了讲解。
3.1. 配置项初始化¶
首先是一些入参、配置项、变量的初始化。
from deepspeed.runtime.zero.stage_1_and_2 import DeepSpeedZeroOptimizer
class DeepSpeedZeroOptimizer(ZeROOptimizer):
"""
DeepSpeedZeroOptimizer designed to reduce the memory footprint
required for training large deep learning models.
For more details please see ZeRO: Memory Optimization Towards Training A Trillion Parameter Models
https://arxiv.org/abs/1910.02054
For usage examples, refer to TODO: DeepSpeed Tutorial
"""
def __init__(self,
init_optimizer,
param_names,
timers,
static_loss_scale=1.0,
dynamic_loss_scale=False,
dynamic_loss_args=None,
verbose=True,
contiguous_gradients=True,
reduce_bucket_size=500000000,
allgather_bucket_size=5000000000,
dp_process_group=None,
expert_parallel_group=None,
expert_data_parallel_group=None,
reduce_scatter=True,
overlap_comm=False,
offload_optimizer_config=None,
mpu=None,
clip_grad=0.0,
gradient_accumulation_dtype=torch.float32,
communication_data_type=torch.float16,
postscale_gradients=True,
gradient_predivide_factor=1.0,
gradient_accumulation_steps=1,
ignore_unused_parameters=True,
partition_grads=True,
round_robin_gradients=False,
has_moe_layers=False,
fp16_master_weights_and_gradients=False,
elastic_checkpoint=False):
# 二阶段也支持 cpu offload 和 pin memory,根据配置情况判断是否启用
if offload_optimizer_config is not None and offload_optimizer_config.device != OffloadDeviceEnum.none:
self.cpu_offload = True
self.cpu_offload_pin_memory = offload_optimizer_config.pin_memory
else:
self.cpu_offload = False
self.cpu_offload_pin_memory = False
if dist.get_rank() == 0:
logger.info(f"Reduce bucket size {reduce_bucket_size}")
logger.info(f"Allgather bucket size {allgather_bucket_size}")
logger.info(f"CPU Offload: {self.cpu_offload}")
logger.info(f'Round robin gradient partitioning: {round_robin_gradients}')
# The fused optimizer does all the work. We need this layer for two reason:
# 1. maintain same user API from apex.fp16_utils
# 2. keep common stuff here in case we need to add ne552w fused optimizer later
self.elastic_checkpoint = elastic_checkpoint
self.param_names = param_names
self.mpu = mpu
# differences from apex.fp16_utils:
# - assume all model params in fp16
# - assume all params requires grad
# - flat by groups, not keeping state. TODO: remove state explicitly?
# - master grad and unflat master weight never exist. TODO: a way to save out unflat master?
if not get_accelerator().is_available():
raise SystemError("Accelerator is not detected, cannot perform low precision training (e.g., fp16, bf16).")
# 基础优化器
self.optimizer = init_optimizer
# Use torch (un)flatten ops
# 把张量打开扁平化的方法,这两个方法调用的是 torch 的方法
self.flatten = _flatten_dense_tensors
self.unflatten = _unflatten_dense_tensors
# ZeRO stage 1 (False) or 2 (True)
# 是否启用梯度分割
self.partition_gradients = partition_grads # type: bool
# stage 阶段
self.zero_stage_string = "ZeRO-2" if partition_grads else "ZeRO-1"
self.timers = timers
self.reduce_scatter = reduce_scatter
# 配置项 默认为 False
# 尝试将梯度缩减与逆向计算相重叠
self.overlap_comm = overlap_comm # type: bool
self.deepspeed_adam_offload = self.cpu_offload
self.device = get_accelerator().current_device_name() if not self.cpu_offload else 'cpu'
# 所属的并行进程组
self.dp_process_group = dp_process_group
# 专家并行所属的组 expert parallel group
self.ep_process_group = expert_parallel_group
# 专家数据并行组 data parallel group for experts
self.expert_dp_process_group = expert_data_parallel_group
# data parallel size for non-experts
dp_size = dist.get_world_size(group=self.dp_process_group)
#For MoE models this maybe different for different param group
#It will be modified during MoE setup later in the init
self.real_dp_process_group = [dp_process_group for i in range(len(self.optimizer.param_groups))]
self.partition_count = [dp_size for i in range(len(self.optimizer.param_groups))]
self.is_gradient_accumulation_boundary = True
# CPU-Offload requires contiguous gradients
# 在生成梯度时将其复制到连续的缓冲区中。避免了后向传递过程中的内存碎片。
self.contiguous_gradients = contiguous_gradients or self.cpu_offload # type: bool
# 是否有 moe 层
self.has_moe_layers = has_moe_layers
if self.has_moe_layers:
self._configure_moe_settings()
self._global_grad_norm = 0.
if mpu is None:
self.model_parallel_group = None
self.model_parallel_world_size = 1
self.model_parallel_rank = 0
else:
self.model_parallel_group = mpu.get_model_parallel_group()
self.model_parallel_world_size = mpu.get_model_parallel_world_size()
self.model_parallel_rank = bwc_tensor_model_parallel_rank(mpu)
self.overflow = False
self.clip_grad = clip_grad
self.communication_data_type = communication_data_type
self.gradient_predivide_factor = gradient_predivide_factor
self.postscale_gradients = postscale_gradients
self.gradient_accumulation_steps = gradient_accumulation_steps
self.micro_step_id = 0
self.ignore_unused_parameters = ignore_unused_parameters
self.round_robin_gradients = round_robin_gradients
self.extra_large_param_to_reduce = None
self.fp16_master_weights_and_gradients = fp16_master_weights_and_gradients
if self.fp16_master_weights_and_gradients:
assert self.cpu_offload and type(self.optimizer) in [DeepSpeedCPUAdam], \
f"fp16_master_and_gradients requires optimizer to support keeping fp16 master and gradients while keeping the optimizer states in fp32."\
f"Currently only supported using ZeRO-Offload with DeepSpeedCPUAdam. But current setting is ZeRO-Offload:{self.cpu_offload} and optimizer type {type(self.optimizer)}." \
f"Either disable fp16_master_weights_and_gradients or enable {self.zero_stage_string} Offload with DeepSpeedCPUAdam."
if self.reduce_scatter:
valid_reduce_scatter_dtypes = (torch.float16, torch.bfloat16, torch.float32)
assert self.communication_data_type in valid_reduce_scatter_dtypes, f"{self.zero_stage_string} supports {valid_reduce_scatter_dtypes} communication_data_type with reduce scatter enabled. Got: '{self.communication_data_type}'"
assert self.gradient_predivide_factor == 1.0, "gradient_predivide_factor != 1.0 is not yet supported with {self.zero_stage_string} with reduce scatter enabled"
assert self.postscale_gradients, "pre-scale gradients is not yet supported with {self.zero_stage_string} with reduce scatter enabled"
# param flattened by groups
self.bit16_groups = []
self.bit16_groups_flat = []
# param partitioned by data parallel degree
# this will contain a list of equal sized tensors
# each of which will be updated by a different process
self.parallel_partitioned_bit16_groups = []
# a single 32-bit partition of the parallel partitioned parameters
# that this process will update
self.single_partition_of_fp32_groups = []
# param partition info
# These are the parameters in each group that will not be updated by this process directly
self.params_not_in_partition = []
# These are the parameters that will be updated by this process directly
self.params_in_partition = []
# Offset from the first parameter in the self.params_in_partition
# the parameter boundaries may not align with partition boundaries
# so we need to keep track of the offset
self.first_offset = []
# number of elements per partition in each group
self.partition_size = []
# align nccl all-gather send buffers to 4-byte boundary
self.nccl_start_alignment_factor = 2 # 4-byte alignment/sizeof(fp16) = 2
assert (
allgather_bucket_size % self.nccl_start_alignment_factor == 0
), f"allgather_bucket_size must be a multiple of nccl_start_alignment_factor, {self.nccl_start_alignment_factor} "
self.all_reduce_print = False
self.dtype = self.optimizer.param_groups[0]['params'][0].dtype
self.gradient_accumulation_dtype = gradient_accumulation_dtype
if self.dtype != self.gradient_accumulation_dtype:
self.use_separate_grad_accum = True
else:
self.use_separate_grad_accum = False
if self.use_separate_grad_accum and not self.partition_gradients:
self.use_grad_accum_for_reduction = True
else:
self.use_grad_accum_for_reduction = False
self.round_robin_bit16_groups = []
self.round_robin_bit16_indices = []
# Use different parallel to do all_to_all_reduce related things
# padding on each partition for alignment purposes
self.groups_padding = []
3.2. 参数分割¶
接下来是一个大循环,循环处理 self.optimizer.param_groups
每个参数组,这里先回顾一下 optimizer.param_groups
是什么。
首先 self.optimizer
是原来的基础优化器,它是 torch.optim.Optimizer
的(兼容)实例。
在创建 torch.optim.Optimizer
时,可以对模型参数进行分组,每组使用不同的学习率和更新参数,
这个 optimizer.param_groups: List[Dict]
是存储这个组的。其本身是一个 list,每个元素是一个 dict,
每个 dict 的key 是 dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad'])
‘params’ :需要梯度更新的模型参数。
‘lr’, ‘betas’, ‘eps’, ‘weight_decay’, ‘amsgrad’ : 本组参数学习率相关的配置项,可以不用管。
# loop to deal with groups
# 在创建 optimizer 时,可以对模型参数进行分组,每组使用不同的 学习率和更新参数
# 这个 self.optimizer.param_groups 是存储这个组的
# 其本身是一个 list,每个元素是一个 dict
# self.optimizer.param_groups : List[Dict]
# 每个 dict 的key 是 dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad'])
# 'params' :需要梯度更新的模型参数
# 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad' : 本组参数学习率相关的配置项
for i, param_group in enumerate(self.optimizer.param_groups):
# 每组参数分开处理
partition_id = dist.get_rank(group=self.real_dp_process_group[i])
# push this group to list before modify
# TODO: Explore simplification that avoids the extra book-keeping by pushing the reordered group
trainable_parameters = []
for param in param_group['params']:
if param.requires_grad:
param.grad_accum = None
trainable_parameters.append(param)
# 当前 param_group 中需要梯度更新的参数列表
# 后续的分割都是针对他们的
self.bit16_groups.append(trainable_parameters)
# not sure why apex was cloning the weights before flattening
# removing cloning here
see_memory_usage(f"Before moving param group {i} to CPU")
# move all the parameters to cpu to free up GPU space for creating flat buffer
# 先转移到 cpu 内存,在 cpu 内存中进行处理
move_to_cpu(self.bit16_groups[i])
empty_cache()
see_memory_usage(f"After moving param group {i} to CPU", force=False)
# Reorder group parameters for load balancing of gradient partitioning during backward among ranks.
# This ensures that gradients are reduced in a fashion such that ownership round robins among the ranks.
# For example, rather than 3 gradients (g_n+2, g_n+1, g_n) that are reduced consecutively belonging
# to the same rank, instead they will belong to 3 ranks (r_m+2, r_m+1, r_m).
# 我们要把参数分配到不同的 rank,然后每个 rank 负责部分参数的梯度计算
# 可以先不用理具体怎么分的,反正就是按照组内进程(GPU)数量进行划分,
if self.round_robin_gradients:
# 为了能尽量的均匀分配,这里采用循环分配(round_robin 方法)
round_robin_tensors, round_robin_indices = self._round_robin_reorder(
self.bit16_groups[i], dist.get_world_size(group=self.real_dp_process_group[i]))
else:
round_robin_tensors = self.bit16_groups[i]
round_robin_indices = list(range(len(self.bit16_groups[i])))
self.round_robin_bit16_groups.append(round_robin_tensors)
self.round_robin_bit16_indices.append(round_robin_indices)
# create flat buffer in CPU and move to GPU
# 将参数列表打平放到一个一维连续空间中
self.bit16_groups_flat.append(
self.flatten_dense_tensors_aligned(
self.round_robin_bit16_groups[i],
self.nccl_start_alignment_factor * dist.get_world_size(group=self.real_dp_process_group[i])).to(
get_accelerator().current_device_name()))
see_memory_usage(f"After flattening and moving param group {i} to GPU", force=False)
# Record padding required for alignment
# 上面在打平的时候,可能在尾部添加了padding,这里要记录一下padding的个数
if partition_id == dist.get_world_size(group=self.real_dp_process_group[i]) - 1:
padding = self.bit16_groups_flat[i].numel() - sum(
[t.numel() for t in self.round_robin_bit16_groups[i]])
else:
padding = 0
self.groups_padding.append(padding)
if dist.get_rank(group=self.real_dp_process_group[i]) == 0:
see_memory_usage(f"After Flattening and after emptying param group {i} cache", force=False)
# set model bit16 weight to slices of flattened buffer
self._update_model_bit16_weights(i)
# divide the flat weights into near equal partition equal to the data parallel degree
# each process will compute on a different part of the partition
# data_parallel_partitions 是分割好的结果
# data_parallel_partitions 是一个字典类型,key 是 rank ,value 是分号的参数
data_parallel_partitions: dict = self.get_data_parallel_partitions(self.bit16_groups_flat[i], i)
self.parallel_partitioned_bit16_groups.append(data_parallel_partitions)
# verify that data partition start locations are 4-byte aligned
for partitioned_data in data_parallel_partitions:
assert (partitioned_data.data_ptr() % (2 * self.nccl_start_alignment_factor) == 0)
# A partition of the fp32 master weights that will be updated by this process.
# Note that the params in single_partition_of_fp32_groups is cloned and detached
# from the origin params of the model.
# 把属于当前进程(rank)的参数移动到指定设备,然后创建一个副本
# 这个副本用于累积梯度进行参数更新,根据配置,可以是 单精度(float32)也可以是半精度(float16)
# 注意这个副本 detach 操作
# 返回一个新的tensor,从当前计算图中分离下来的,但是仍指向原变量的存放位置, 不同之处只是requires_grad为false,
# 得到的这个tensor永远不需要计算其梯度,不具有grad
if not fp16_master_weights_and_gradients:
self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to(
self.device).clone().float().detach())
else:
self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to(
self.device).clone().half().detach())
# self.single_partition_of_fp32_groups 中只包含属于当前进程(rank)的参数
# Set local optimizer to have flat params of its own partition.
# After this, the local optimizer will only contain its own partition of params.
# In that case, the local optimizer only saves the states(momentum, variance, etc.) related to its partition's params(zero stage1).
# todo : 这里没理解,按照 detach 的说明,即使赋予 requires_grad = True 也不会计算梯度
self.single_partition_of_fp32_groups[
i].requires_grad = True # keep this in case internal optimizer uses it
# 重置了优化器的 param_group,仅包含分给当前进程(rank)的参数
param_group['params'] = [self.single_partition_of_fp32_groups[i]]
partition_size = len(self.bit16_groups_flat[i]) / dist.get_world_size(group=self.real_dp_process_group[i])
params_in_partition, params_not_in_partition, first_offset = self.get_partition_info(
self.round_robin_bit16_groups[i], partition_size, partition_id)
self.partition_size.append(partition_size)
self.params_in_partition.append(params_in_partition)
self.params_not_in_partition.append(params_not_in_partition)
self.first_offset.append(first_offset)
3.3. cpu offload¶
待补充