diff --git a/src/mindspore2022/mindspore/python/mindspore/_extends/graph_kernel/model/graph_parallel.py b/src/mindspore2022/mindspore/python/mindspore/_extends/graph_kernel/model/graph_parallel.py index fdf90238..f910b8fe 100644 --- a/src/mindspore2022/mindspore/python/mindspore/_extends/graph_kernel/model/graph_parallel.py +++ b/src/mindspore2022/mindspore/python/mindspore/_extends/graph_kernel/model/graph_parallel.py @@ -109,100 +109,210 @@ class ScheduleAnalyzer: return res def _cal_weight(self, ops): - # 初始化权重为0 + """ + 计算给定操作列表的总权重。 + + Args: + ops (list): 包含多个操作对象的列表。 + + Returns: + int: 所有操作的权重总和。 + + """ weight = 0 for op in ops: - # 计算当前操作的权重 + # 遍历每个操作 weight += self.prod(op.output.shape) * \ - # 根据输出数据类型计算字节数 - PrimLib.dtype_bytes(op.output.dtype) - # 返回计算得到的权重 + PrimLib.dtype_bytes(op.output.dtype) # 计算op的输出数据类型的字节数 return weight def injective_analyze(self): + """ + 分析单射情况。 + + Args: + 无 + + Returns: + 无 + + """ """analyze injective case""" + # 计算常量大小 const_size = max((self.prod(op.output.shape) for op in self.dom_op)) + # 调整常量大小,确保是MAX_NUM_THREADS的倍数 const_size = (const_size + self.MAX_NUM_THREADS - 1) // self.MAX_NUM_THREADS * self.MAX_NUM_THREADS + # 计算总权重 total_weight = self._cal_weight(self.ops) + # 计算总块数 total_block = (const_size + self.MAX_NUM_THREADS - 1) // self.MAX_NUM_THREADS + # 判断是否需要分割块 need_block_split = const_size > self.MAX_BLOCK * self.MAX_NUM_THREADS if need_block_split: + # 如果需要分割块,设置块数为MAX_BLOCK self.block_num = self.MAX_BLOCK + # 计算波数 waves = (total_block + self.MAX_BLOCK - 1) // self.MAX_BLOCK + # 计算块权重 self.block_weight = total_weight // total_block * waves else: + # 如果不需要分割块,设置块数为总块数 self.block_num = total_block + # 计算块权重 self.block_weight = total_weight // self.block_num def reduce_analyze(self): + """ + 分析reduce操作。 + + Args: + 无 + + Returns: + 无 + + Raises: + RuntimeError: 如果并行融合不支持多个reduce操作,或者没有找到reduce操作。 + + """ """analyze reduce case""" + # 定义线程数 thread_x, thread_y = 32, 32 reduce_op = None + for op in self.ops: + # 判断操作类型是否为reduce if PrimLib.iter_type(op) == PrimLib.REDUCE: + # 如果已经存在reduce操作,则抛出异常 if reduce_op: raise RuntimeError("Parallel fusion does not support multiple reduce op now.") reduce_op = op + + # 如果没有找到reduce操作,则抛出异常 if not reduce_op: raise RuntimeError("Parallel fusion does not find a reduce op.") + + # 获取reduce操作的输入形状 shape = reduce_op.inputs[0].shape + # 获取reduce操作的reduce轴 reduce_axis = reduce_op.attrs['reduce_axis'] + # 计算总空间 total_space = self.prod(shape) + # 计算reduce空间 red_space = shape[reduce_axis[0]] for i in range(1, len(reduce_axis)): red_space *= shape[reduce_axis[i]] + + # 获取数据类型大小 dtype_size = PrimLib.dtype_bytes(reduce_op.output.dtype) + # 计算权重 weight = self._cal_weight(self.ops) # reduce + injective + # 计算block_x block_x = (total_space // red_space + thread_y - 1) // thread_y + # 计算block_w block_w = (weight + block_x - 1) // block_x + # 计算waves waves = (block_x + self.MAX_BLOCK - 1) // self.MAX_BLOCK + # 设置block_num self.block_num = min(self.MAX_BLOCK, block_x) + + # 定义all_reduce all_reduce = 10 # 1 reduce init + 3 sync + 5 bin + 1 write + # 计算block_weight self.block_weight = (block_w + all_reduce * dtype_size * thread_x * thread_y) * waves def default_analyze(self): + """ + 默认分析函数 + + Args: + 无 + + Returns: + 无 + + Raises: + 无 + + """ """analyze default case""" + # 定义一个内部函数,用于计算默认空间 def _cal_default_space(op): + # 计算op的输出空间 space = self.prod(op.output.shape) + # 遍历op的所有输入 for t in op.inputs: + # 计算输入的空间 size = self.prod(t.shape) + # 如果输入空间大于当前空间,则更新空间 if size > space: space = size + # 返回计算出的空间 return space + + # 计算所有操作中的最大空间 space = max((_cal_default_space(op) for op in self.dom_op)) - # each sm least 4 wrap + # 每个sm至少包含4个wrap + # 计算所需的block数量 block = (space + (self.WRAP_SIZE * 4) - 1) // (self.WRAP_SIZE * 4) + # 将block数量限制在最大block数量之内 self.block_num = min(self.MAX_BLOCK, block) + # 计算每个block的权重 self.block_weight = self._cal_weight(self.ops) // self.block_num def analyze(self): """analyze ops""" def _ops_type(ops, dom_op): + """ + 判断操作列表中是否包含reduce操作。 + + Args: + ops (list): 操作列表。 + dom_op (list): 操作列表。 + + Returns: + bool: 如果操作列表中包含reduce操作,则返回True;否则返回False。 + """ + # 检查ops列表中是否有reduce操作 have_reduce = any( + # 如果op的类型是PrimLib.REDUCE,则返回True (PrimLib.iter_type(op) == PrimLib.REDUCE for op in ops)) if have_reduce: + # 如果有reduce操作,返回True return True + # 否则返回dom_op[0]的类型 return PrimLib.iter_type(dom_op[0]) + # 调用_ops_type函数,获取dom_op的类型 dom_type = _ops_type(self.ops, self.dom_op) + # 如果dom_type是PrimLib.ELEMWISE或PrimLib.BROADCAST类型 if dom_type in (PrimLib.ELEMWISE, PrimLib.BROADCAST): + # 调用injective_analyze方法 self.injective_analyze() + # 如果dom_type是PrimLib.REDUCE类型 elif dom_type == PrimLib.REDUCE: + # 调用reduce_analyze方法 self.reduce_analyze() + # 如果dom_type是其他类型 else: + # 调用default_analyze方法 self.default_analyze() def suitable_to_pipeline(self): """judge whether is suitable to be pipeline optimized""" + # 判断是否适合进行流水线优化 + + # Reduce操作不适合 # Reduce is not suitable def _contain_reduce(ops): for op in ops: + # Reduce操作可能导致分片效果差 # Reduce may make the tiling bad. if PrimLib.primtives.get(op.prim, None) == PrimLib.REDUCE: return True @@ -210,6 +320,7 @@ class ScheduleAnalyzer: suitable = True if _contain_reduce(self.ops): + # 如果包含Reduce操作,则不适合进行流水线优化 suitable = False return suitable @@ -227,13 +338,16 @@ class ScheduleAnalyzer: classes (list[list[int]]): The list of clusters. Each cluster is a list of indices. """ def _cal_mean(classes): + # 计算每个聚类的均值 class_datas = list(list(data[cid] for cid in cls) for cls in classes) return list(sum(cls) / len(cls) if cls else float('inf') for cls in class_datas) def _cal_distance(a, b): + # 计算两个元素之间的距离 return abs(a - b) def _check_different(old_classes, new_classes): + # 检查新旧聚类是否不同 for o, n in zip(old_classes, new_classes): if o != n: return True @@ -262,31 +376,39 @@ class ScheduleAnalyzer: min_idx = i if min_dis > cur_dis else min_idx min_dis = cur_dis if min_dis > cur_dis else min_dis new_classes[min_idx].append(idx) + # 检查聚类是否发生变化 changed = _check_different(classes, new_classes) + # 更新聚类 classes = new_classes return classes @staticmethod def pipeline_fusion_analyze(blocks, op_sizes, exclude_id): """analyze whether the segments can be pipeline optimized""" - # op size first, block second. + # op size first, block second。 + # 操作大小在前,块在后 def _simple_factor(block, op_size): return block + 5 * op_size def _take_second(elem): return elem[1] + # 计算每个块的简单因子 simple_indicators = list(_simple_factor(b, s) for b, s in zip(blocks, op_sizes)) # 2 classes, one heavy, the other light + # 两类,一类重,一类轻 classes = ScheduleAnalyzer.k_mean(simple_indicators, 2, exclude_id) if not classes: return [] + # 计算每类的均值 means = list(sum([simple_indicators[idx] for idx in cls]) / len(cls) if cls else float('inf') for cls in classes) # The target two clusters should be a heavy one and a light one. + # 目标两类应该是一类重的和一类轻的 # The light one maybe suitable to run with pipeline optimized. + # 轻的一类可能适合进行流水线优化 classes_infos = list([cls, m] for cls, m in zip(classes, means)) classes_infos.sort(key=_take_second) pipeline_target = None @@ -295,6 +417,7 @@ class ScheduleAnalyzer: pipeline_target = ci break pipeline_gids, pipeline_mean = pipeline_target + # 如果轻的一类的均值大于某个阈值,则返回空列表 if pipeline_mean > _simple_factor(float(ScheduleAnalyzer.MAX_SM) / len(blocks), ScheduleAnalyzer.PIPELINE_OP_THREADHOLD): return [] @@ -302,6 +425,7 @@ class ScheduleAnalyzer: pipeline_blocks = [] pipeline_weight = len(pipeline_gids) # Try to make two paralleled at least. + # 至少尝试两个并行 if pipeline_weight > 3 and pipeline_weight > len(blocks) / 2: if len(pipeline_gids[:pipeline_weight // 2]) > 1: pipeline_blocks.append(pipeline_gids[:pipeline_weight // 2]) @@ -313,49 +437,114 @@ class ScheduleAnalyzer: @staticmethod def fusion_consult(blocks, op_sizes, exclude_gid): + """ + 获取并行融合的建议。 + + Args: + blocks (list): 包含多个计算块的列表。 + op_sizes (list): 每个操作的尺寸列表。 + exclude_gid (int): 需要排除的组ID。 + + Returns: + tuple: 包含融合类型和类型信息的元组。 + + Raises: + 无 + + """ """get a recommendation for parallel fusion""" + # 默认是块融合 # Default is block fusion fusion_type = "block_fusion" type_info = None + # 禁用管道优化 activate_pipeline_optimization = False # Disable pipeline optimization for now. + # 如果启用管道优化 if activate_pipeline_optimization: + # 对块、操作大小和排除组ID进行管道融合分析 pipeline_info = ScheduleAnalyzer.pipeline_fusion_analyze( blocks, op_sizes, exclude_gid) + # 如果存在管道信息 if pipeline_info: + # 融合类型为块管道融合 fusion_type = "block_pipeline_fusion" + # 设置类型信息为管道信息 type_info = pipeline_info return fusion_type, type_info def block_parallel_estimate(graphs): + """ + 估计块并行增益。 + + Args: + graphs (list): 图集合,每个元素是一个图对象。 + + Returns: + ParalGain: 包含块并行增益信息的ParalGain对象。 + + """ """estimate block parallel gain""" + # 初始化变量 sum_block, max_weight, sum_weight, blocks, op_sizes, exclude_gid = 0, 0, 0, [], [], [] + + # 遍历图集合 for gid, g in enumerate(graphs): + # 创建ScheduleAnalyzer对象 s = ScheduleAnalyzer(g) + # 分析图 s.analyze() + # 累加块的数量 sum_block += s.block_num + # 更新最大权重 if s.block_weight > max_weight: max_weight = s.block_weight + # 累加权重 sum_weight += s.block_weight + # 添加块的数量到blocks列表 blocks.append(s.block_num) + # 添加操作数量到op_sizes列表 op_sizes.append(len(s.ops)) + # 如果不适合流水线处理,将gid添加到exclude_gid列表 if not s.suitable_to_pipeline(): exclude_gid.append(gid) + + # 如果块的数量大于ScheduleAnalyzer.MAX_SM * 32,返回"none" if sum_block > ScheduleAnalyzer.MAX_SM * 32: return ParalGain("none", sum_weight, 0, list(0 for _ in graphs), None) + # 获取融合类型和类型信息 fusion_type, type_info = ScheduleAnalyzer.fusion_consult(blocks, op_sizes, tuple(exclude_gid)) + # 返回ParalGain对象 return ParalGain(fusion_type, max_weight, sum_weight - max_weight, blocks, type_info) def parallel_estimate(graphs, target): + """ + 并行估计函数。 + + Args: + graphs (list): 图结构列表。 + target (str): 目标类型,例如"aicore"。 + + Returns: + ParalGain: 并行增益对象。 + + """ """Estimate parallel gain""" + # 如果目标是"aicore" if target == "aicore": + # 融合类型为"block_fusion" fusion_type = "block_fusion" + # 类型信息为空 type_info = None + # 假设估计值为1000 fake_estimate = 1000 + # 生成一个与graphs长度相同的列表,每个元素都是1 fake_blocks = list(1 for g in graphs) + # 返回ParalGain对象 return ParalGain(fusion_type, fake_estimate, fake_estimate, fake_blocks, type_info) + # 调用block_parallel_estimate函数进行并行估计 return block_parallel_estimate(graphs)