############################################### stage2 - 初始化 ############################################### 从入口那里已经看到,当 stage 1,2 时, 会创建优化器 ``DeepSpeedZeroOptimizer`` 代替原来的优化器, stage 1,2 的特性都在这个优化器中实现。 Stage 1,2 的核心功能就是对参数的梯度和优化器的状态进行分割, 每个进程(GPU,rank)只保留一部分,减少对显存的消耗。 新版本中,这部分也支持 cpu offload 的功能。 核心思路也简单,就是对基础(原始)优化器内的 ``params_group`` 进行处理, 只保留属于当前进程(GPU,rank)的参数,其它的都从优化器中删除, 这样就只会计算保留部分的梯度,以及只有保留部分的优化器状态。 对于优化器的分割初始化功能的实现都写在类的 ``__init__`` 方法里了, 省略一些吐槽。导致这个 ``__init__`` 方法代码很长,我们把它分拆了讲解。 配置项初始化 ############################################### 首先是一些入参、配置项、变量的初始化。 .. code-block:: python from deepspeed.runtime.zero.stage_1_and_2 import DeepSpeedZeroOptimizer class DeepSpeedZeroOptimizer(ZeROOptimizer): """ DeepSpeedZeroOptimizer designed to reduce the memory footprint required for training large deep learning models. For more details please see ZeRO: Memory Optimization Towards Training A Trillion Parameter Models https://arxiv.org/abs/1910.02054 For usage examples, refer to TODO: DeepSpeed Tutorial """ def __init__(self, init_optimizer, param_names, timers, static_loss_scale=1.0, dynamic_loss_scale=False, dynamic_loss_args=None, verbose=True, contiguous_gradients=True, reduce_bucket_size=500000000, allgather_bucket_size=5000000000, dp_process_group=None, expert_parallel_group=None, expert_data_parallel_group=None, reduce_scatter=True, overlap_comm=False, offload_optimizer_config=None, mpu=None, clip_grad=0.0, gradient_accumulation_dtype=torch.float32, communication_data_type=torch.float16, postscale_gradients=True, gradient_predivide_factor=1.0, gradient_accumulation_steps=1, ignore_unused_parameters=True, partition_grads=True, round_robin_gradients=False, has_moe_layers=False, fp16_master_weights_and_gradients=False, elastic_checkpoint=False): # 二阶段也支持 cpu offload 和 pin memory,根据配置情况判断是否启用 if offload_optimizer_config is not None and offload_optimizer_config.device != OffloadDeviceEnum.none: self.cpu_offload = True self.cpu_offload_pin_memory = offload_optimizer_config.pin_memory else: self.cpu_offload = False self.cpu_offload_pin_memory = False if dist.get_rank() == 0: logger.info(f"Reduce bucket size {reduce_bucket_size}") logger.info(f"Allgather bucket size {allgather_bucket_size}") logger.info(f"CPU Offload: {self.cpu_offload}") logger.info(f'Round robin gradient partitioning: {round_robin_gradients}') # The fused optimizer does all the work. We need this layer for two reason: # 1. maintain same user API from apex.fp16_utils # 2. keep common stuff here in case we need to add ne552w fused optimizer later self.elastic_checkpoint = elastic_checkpoint self.param_names = param_names self.mpu = mpu # differences from apex.fp16_utils: # - assume all model params in fp16 # - assume all params requires grad # - flat by groups, not keeping state. TODO: remove state explicitly? # - master grad and unflat master weight never exist. TODO: a way to save out unflat master? if not get_accelerator().is_available(): raise SystemError("Accelerator is not detected, cannot perform low precision training (e.g., fp16, bf16).") # 基础优化器 self.optimizer = init_optimizer # Use torch (un)flatten ops # 把张量打开扁平化的方法,这两个方法调用的是 torch 的方法 self.flatten = _flatten_dense_tensors self.unflatten = _unflatten_dense_tensors # ZeRO stage 1 (False) or 2 (True) # 是否启用梯度分割 self.partition_gradients = partition_grads # type: bool # stage 阶段 self.zero_stage_string = "ZeRO-2" if partition_grads else "ZeRO-1" self.timers = timers self.reduce_scatter = reduce_scatter # 配置项 默认为 False # 尝试将梯度缩减与逆向计算相重叠 self.overlap_comm = overlap_comm # type: bool self.deepspeed_adam_offload = self.cpu_offload self.device = get_accelerator().current_device_name() if not self.cpu_offload else 'cpu' # 所属的并行进程组 self.dp_process_group = dp_process_group # 专家并行所属的组 expert parallel group self.ep_process_group = expert_parallel_group # 专家数据并行组 data parallel group for experts self.expert_dp_process_group = expert_data_parallel_group # data parallel size for non-experts dp_size = dist.get_world_size(group=self.dp_process_group) #For MoE models this maybe different for different param group #It will be modified during MoE setup later in the init self.real_dp_process_group = [dp_process_group for i in range(len(self.optimizer.param_groups))] self.partition_count = [dp_size for i in range(len(self.optimizer.param_groups))] self.is_gradient_accumulation_boundary = True # CPU-Offload requires contiguous gradients # 在生成梯度时将其复制到连续的缓冲区中。避免了后向传递过程中的内存碎片。 self.contiguous_gradients = contiguous_gradients or self.cpu_offload # type: bool # 是否有 moe 层 self.has_moe_layers = has_moe_layers if self.has_moe_layers: self._configure_moe_settings() self._global_grad_norm = 0. if mpu is None: self.model_parallel_group = None self.model_parallel_world_size = 1 self.model_parallel_rank = 0 else: self.model_parallel_group = mpu.get_model_parallel_group() self.model_parallel_world_size = mpu.get_model_parallel_world_size() self.model_parallel_rank = bwc_tensor_model_parallel_rank(mpu) self.overflow = False self.clip_grad = clip_grad self.communication_data_type = communication_data_type self.gradient_predivide_factor = gradient_predivide_factor self.postscale_gradients = postscale_gradients self.gradient_accumulation_steps = gradient_accumulation_steps self.micro_step_id = 0 self.ignore_unused_parameters = ignore_unused_parameters self.round_robin_gradients = round_robin_gradients self.extra_large_param_to_reduce = None self.fp16_master_weights_and_gradients = fp16_master_weights_and_gradients if self.fp16_master_weights_and_gradients: assert self.cpu_offload and type(self.optimizer) in [DeepSpeedCPUAdam], \ f"fp16_master_and_gradients requires optimizer to support keeping fp16 master and gradients while keeping the optimizer states in fp32."\ f"Currently only supported using ZeRO-Offload with DeepSpeedCPUAdam. But current setting is ZeRO-Offload:{self.cpu_offload} and optimizer type {type(self.optimizer)}." \ f"Either disable fp16_master_weights_and_gradients or enable {self.zero_stage_string} Offload with DeepSpeedCPUAdam." if self.reduce_scatter: valid_reduce_scatter_dtypes = (torch.float16, torch.bfloat16, torch.float32) assert self.communication_data_type in valid_reduce_scatter_dtypes, f"{self.zero_stage_string} supports {valid_reduce_scatter_dtypes} communication_data_type with reduce scatter enabled. Got: '{self.communication_data_type}'" assert self.gradient_predivide_factor == 1.0, "gradient_predivide_factor != 1.0 is not yet supported with {self.zero_stage_string} with reduce scatter enabled" assert self.postscale_gradients, "pre-scale gradients is not yet supported with {self.zero_stage_string} with reduce scatter enabled" # param flattened by groups self.bit16_groups = [] self.bit16_groups_flat = [] # param partitioned by data parallel degree # this will contain a list of equal sized tensors # each of which will be updated by a different process self.parallel_partitioned_bit16_groups = [] # a single 32-bit partition of the parallel partitioned parameters # that this process will update self.single_partition_of_fp32_groups = [] # param partition info # These are the parameters in each group that will not be updated by this process directly self.params_not_in_partition = [] # These are the parameters that will be updated by this process directly self.params_in_partition = [] # Offset from the first parameter in the self.params_in_partition # the parameter boundaries may not align with partition boundaries # so we need to keep track of the offset self.first_offset = [] # number of elements per partition in each group self.partition_size = [] # align nccl all-gather send buffers to 4-byte boundary self.nccl_start_alignment_factor = 2 # 4-byte alignment/sizeof(fp16) = 2 assert ( allgather_bucket_size % self.nccl_start_alignment_factor == 0 ), f"allgather_bucket_size must be a multiple of nccl_start_alignment_factor, {self.nccl_start_alignment_factor} " self.all_reduce_print = False self.dtype = self.optimizer.param_groups[0]['params'][0].dtype self.gradient_accumulation_dtype = gradient_accumulation_dtype if self.dtype != self.gradient_accumulation_dtype: self.use_separate_grad_accum = True else: self.use_separate_grad_accum = False if self.use_separate_grad_accum and not self.partition_gradients: self.use_grad_accum_for_reduction = True else: self.use_grad_accum_for_reduction = False self.round_robin_bit16_groups = [] self.round_robin_bit16_indices = [] # Use different parallel to do all_to_all_reduce related things # padding on each partition for alignment purposes self.groups_padding = [] 参数分割 ############################################### 接下来是一个大循环,循环处理 ``self.optimizer.param_groups`` 每个参数组,这里先回顾一下 ``optimizer.param_groups`` 是什么。 首先 ``self.optimizer`` 是原来的基础优化器,它是 ``torch.optim.Optimizer`` 的(兼容)实例。 在创建 ``torch.optim.Optimizer`` 时,可以对模型参数进行分组,每组使用不同的学习率和更新参数, 这个 ``optimizer.param_groups: List[Dict]`` 是存储这个组的。其本身是一个 list,每个元素是一个 dict, 每个 dict 的key 是 ``dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad'])`` - 'params' :需要梯度更新的模型参数。 - 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad' : 本组参数学习率相关的配置项,可以不用管。 .. code-block:: python # loop to deal with groups # 在创建 optimizer 时,可以对模型参数进行分组,每组使用不同的 学习率和更新参数 # 这个 self.optimizer.param_groups 是存储这个组的 # 其本身是一个 list,每个元素是一个 dict # self.optimizer.param_groups : List[Dict] # 每个 dict 的key 是 dict_keys(['params', 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad']) # 'params' :需要梯度更新的模型参数 # 'lr', 'betas', 'eps', 'weight_decay', 'amsgrad' : 本组参数学习率相关的配置项 for i, param_group in enumerate(self.optimizer.param_groups): # 每组参数分开处理 partition_id = dist.get_rank(group=self.real_dp_process_group[i]) # push this group to list before modify # TODO: Explore simplification that avoids the extra book-keeping by pushing the reordered group trainable_parameters = [] for param in param_group['params']: if param.requires_grad: param.grad_accum = None trainable_parameters.append(param) # 当前 param_group 中需要梯度更新的参数列表 # 后续的分割都是针对他们的 self.bit16_groups.append(trainable_parameters) # not sure why apex was cloning the weights before flattening # removing cloning here see_memory_usage(f"Before moving param group {i} to CPU") # move all the parameters to cpu to free up GPU space for creating flat buffer # 先转移到 cpu 内存,在 cpu 内存中进行处理 move_to_cpu(self.bit16_groups[i]) empty_cache() see_memory_usage(f"After moving param group {i} to CPU", force=False) # Reorder group parameters for load balancing of gradient partitioning during backward among ranks. # This ensures that gradients are reduced in a fashion such that ownership round robins among the ranks. # For example, rather than 3 gradients (g_n+2, g_n+1, g_n) that are reduced consecutively belonging # to the same rank, instead they will belong to 3 ranks (r_m+2, r_m+1, r_m). # 我们要把参数分配到不同的 rank,然后每个 rank 负责部分参数的梯度计算 # 可以先不用理具体怎么分的,反正就是按照组内进程(GPU)数量进行划分, if self.round_robin_gradients: # 为了能尽量的均匀分配,这里采用循环分配(round_robin 方法) round_robin_tensors, round_robin_indices = self._round_robin_reorder( self.bit16_groups[i], dist.get_world_size(group=self.real_dp_process_group[i])) else: round_robin_tensors = self.bit16_groups[i] round_robin_indices = list(range(len(self.bit16_groups[i]))) self.round_robin_bit16_groups.append(round_robin_tensors) self.round_robin_bit16_indices.append(round_robin_indices) # create flat buffer in CPU and move to GPU # 将参数列表打平放到一个一维连续空间中 self.bit16_groups_flat.append( self.flatten_dense_tensors_aligned( self.round_robin_bit16_groups[i], self.nccl_start_alignment_factor * dist.get_world_size(group=self.real_dp_process_group[i])).to( get_accelerator().current_device_name())) see_memory_usage(f"After flattening and moving param group {i} to GPU", force=False) # Record padding required for alignment # 上面在打平的时候,可能在尾部添加了padding,这里要记录一下padding的个数 if partition_id == dist.get_world_size(group=self.real_dp_process_group[i]) - 1: padding = self.bit16_groups_flat[i].numel() - sum( [t.numel() for t in self.round_robin_bit16_groups[i]]) else: padding = 0 self.groups_padding.append(padding) if dist.get_rank(group=self.real_dp_process_group[i]) == 0: see_memory_usage(f"After Flattening and after emptying param group {i} cache", force=False) # set model bit16 weight to slices of flattened buffer self._update_model_bit16_weights(i) # divide the flat weights into near equal partition equal to the data parallel degree # each process will compute on a different part of the partition # data_parallel_partitions 是分割好的结果 # data_parallel_partitions 是一个字典类型,key 是 rank ,value 是分号的参数 data_parallel_partitions: dict = self.get_data_parallel_partitions(self.bit16_groups_flat[i], i) self.parallel_partitioned_bit16_groups.append(data_parallel_partitions) # verify that data partition start locations are 4-byte aligned for partitioned_data in data_parallel_partitions: assert (partitioned_data.data_ptr() % (2 * self.nccl_start_alignment_factor) == 0) # A partition of the fp32 master weights that will be updated by this process. # Note that the params in single_partition_of_fp32_groups is cloned and detached # from the origin params of the model. # 把属于当前进程(rank)的参数移动到指定设备,然后创建一个副本 # 这个副本用于累积梯度进行参数更新,根据配置,可以是 单精度(float32)也可以是半精度(float16) # 注意这个副本 detach 操作 # 返回一个新的tensor,从当前计算图中分离下来的,但是仍指向原变量的存放位置, 不同之处只是requires_grad为false, # 得到的这个tensor永远不需要计算其梯度,不具有grad if not fp16_master_weights_and_gradients: self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to( self.device).clone().float().detach()) else: self.single_partition_of_fp32_groups.append(self.parallel_partitioned_bit16_groups[i][partition_id].to( self.device).clone().half().detach()) # self.single_partition_of_fp32_groups 中只包含属于当前进程(rank)的参数 # Set local optimizer to have flat params of its own partition. # After this, the local optimizer will only contain its own partition of params. # In that case, the local optimizer only saves the states(momentum, variance, etc.) related to its partition's params(zero stage1). # todo : 这里没理解,按照 detach 的说明,即使赋予 requires_grad = True 也不会计算梯度 self.single_partition_of_fp32_groups[ i].requires_grad = True # keep this in case internal optimizer uses it # 重置了优化器的 param_group,仅包含分给当前进程(rank)的参数 param_group['params'] = [self.single_partition_of_fp32_groups[i]] partition_size = len(self.bit16_groups_flat[i]) / dist.get_world_size(group=self.real_dp_process_group[i]) params_in_partition, params_not_in_partition, first_offset = self.get_partition_info( self.round_robin_bit16_groups[i], partition_size, partition_id) self.partition_size.append(partition_size) self.params_in_partition.append(params_in_partition) self.params_not_in_partition.append(params_not_in_partition) self.first_offset.append(first_offset) cpu offload ############################################### 待补充