_extends\graph_kernel\model\graph_parallel.py

branch-yixin
yixin 7 months ago
parent ffdf6162c7
commit cd3f01ab90

@ -109,100 +109,210 @@ class ScheduleAnalyzer:
return res return res
def _cal_weight(self, ops): def _cal_weight(self, ops):
# 初始化权重为0 """
计算给定操作列表的总权重
Args:
ops (list): 包含多个操作对象的列表
Returns:
int: 所有操作的权重总和
"""
weight = 0 weight = 0
for op in ops: for op in ops:
# 计算当前操作的权重 # 遍历每个操作
weight += self.prod(op.output.shape) * \ weight += self.prod(op.output.shape) * \
# 根据输出数据类型计算字节数 PrimLib.dtype_bytes(op.output.dtype) # 计算op的输出数据类型的字节数
PrimLib.dtype_bytes(op.output.dtype)
# 返回计算得到的权重
return weight return weight
def injective_analyze(self): def injective_analyze(self):
"""
分析单射情况
Args:
Returns:
"""
"""analyze injective case""" """analyze injective case"""
# 计算常量大小
const_size = max((self.prod(op.output.shape) for op in self.dom_op)) const_size = max((self.prod(op.output.shape) for op in self.dom_op))
# 调整常量大小确保是MAX_NUM_THREADS的倍数
const_size = (const_size + self.MAX_NUM_THREADS - const_size = (const_size + self.MAX_NUM_THREADS -
1) // self.MAX_NUM_THREADS * self.MAX_NUM_THREADS 1) // self.MAX_NUM_THREADS * self.MAX_NUM_THREADS
# 计算总权重
total_weight = self._cal_weight(self.ops) total_weight = self._cal_weight(self.ops)
# 计算总块数
total_block = (const_size + self.MAX_NUM_THREADS - total_block = (const_size + self.MAX_NUM_THREADS -
1) // self.MAX_NUM_THREADS 1) // self.MAX_NUM_THREADS
# 判断是否需要分割块
need_block_split = const_size > self.MAX_BLOCK * self.MAX_NUM_THREADS need_block_split = const_size > self.MAX_BLOCK * self.MAX_NUM_THREADS
if need_block_split: if need_block_split:
# 如果需要分割块设置块数为MAX_BLOCK
self.block_num = self.MAX_BLOCK self.block_num = self.MAX_BLOCK
# 计算波数
waves = (total_block + self.MAX_BLOCK - 1) // self.MAX_BLOCK waves = (total_block + self.MAX_BLOCK - 1) // self.MAX_BLOCK
# 计算块权重
self.block_weight = total_weight // total_block * waves self.block_weight = total_weight // total_block * waves
else: else:
# 如果不需要分割块,设置块数为总块数
self.block_num = total_block self.block_num = total_block
# 计算块权重
self.block_weight = total_weight // self.block_num self.block_weight = total_weight // self.block_num
def reduce_analyze(self): def reduce_analyze(self):
"""
分析reduce操作
Args:
Returns:
Raises:
RuntimeError: 如果并行融合不支持多个reduce操作或者没有找到reduce操作
"""
"""analyze reduce case""" """analyze reduce case"""
# 定义线程数
thread_x, thread_y = 32, 32 thread_x, thread_y = 32, 32
reduce_op = None reduce_op = None
for op in self.ops: for op in self.ops:
# 判断操作类型是否为reduce
if PrimLib.iter_type(op) == PrimLib.REDUCE: if PrimLib.iter_type(op) == PrimLib.REDUCE:
# 如果已经存在reduce操作则抛出异常
if reduce_op: if reduce_op:
raise RuntimeError("Parallel fusion does not support multiple reduce op now.") raise RuntimeError("Parallel fusion does not support multiple reduce op now.")
reduce_op = op reduce_op = op
# 如果没有找到reduce操作则抛出异常
if not reduce_op: if not reduce_op:
raise RuntimeError("Parallel fusion does not find a reduce op.") raise RuntimeError("Parallel fusion does not find a reduce op.")
# 获取reduce操作的输入形状
shape = reduce_op.inputs[0].shape shape = reduce_op.inputs[0].shape
# 获取reduce操作的reduce轴
reduce_axis = reduce_op.attrs['reduce_axis'] reduce_axis = reduce_op.attrs['reduce_axis']
# 计算总空间
total_space = self.prod(shape) total_space = self.prod(shape)
# 计算reduce空间
red_space = shape[reduce_axis[0]] red_space = shape[reduce_axis[0]]
for i in range(1, len(reduce_axis)): for i in range(1, len(reduce_axis)):
red_space *= shape[reduce_axis[i]] red_space *= shape[reduce_axis[i]]
# 获取数据类型大小
dtype_size = PrimLib.dtype_bytes(reduce_op.output.dtype) dtype_size = PrimLib.dtype_bytes(reduce_op.output.dtype)
# 计算权重
weight = self._cal_weight(self.ops) # reduce + injective weight = self._cal_weight(self.ops) # reduce + injective
# 计算block_x
block_x = (total_space // red_space + thread_y - 1) // thread_y block_x = (total_space // red_space + thread_y - 1) // thread_y
# 计算block_w
block_w = (weight + block_x - 1) // block_x block_w = (weight + block_x - 1) // block_x
# 计算waves
waves = (block_x + self.MAX_BLOCK - 1) // self.MAX_BLOCK waves = (block_x + self.MAX_BLOCK - 1) // self.MAX_BLOCK
# 设置block_num
self.block_num = min(self.MAX_BLOCK, block_x) self.block_num = min(self.MAX_BLOCK, block_x)
# 定义all_reduce
all_reduce = 10 # 1 reduce init + 3 sync + 5 bin + 1 write all_reduce = 10 # 1 reduce init + 3 sync + 5 bin + 1 write
# 计算block_weight
self.block_weight = (block_w + all_reduce * self.block_weight = (block_w + all_reduce *
dtype_size * thread_x * thread_y) * waves dtype_size * thread_x * thread_y) * waves
def default_analyze(self): def default_analyze(self):
"""
默认分析函数
Args:
Returns:
Raises:
"""
"""analyze default case""" """analyze default case"""
# 定义一个内部函数,用于计算默认空间
def _cal_default_space(op): def _cal_default_space(op):
# 计算op的输出空间
space = self.prod(op.output.shape) space = self.prod(op.output.shape)
# 遍历op的所有输入
for t in op.inputs: for t in op.inputs:
# 计算输入的空间
size = self.prod(t.shape) size = self.prod(t.shape)
# 如果输入空间大于当前空间,则更新空间
if size > space: if size > space:
space = size space = size
# 返回计算出的空间
return space return space
# 计算所有操作中的最大空间
space = max((_cal_default_space(op) for op in self.dom_op)) space = max((_cal_default_space(op) for op in self.dom_op))
# each sm least 4 wrap # 每个sm至少包含4个wrap
# 计算所需的block数量
block = (space + (self.WRAP_SIZE * 4) - 1) // (self.WRAP_SIZE * 4) block = (space + (self.WRAP_SIZE * 4) - 1) // (self.WRAP_SIZE * 4)
# 将block数量限制在最大block数量之内
self.block_num = min(self.MAX_BLOCK, block) self.block_num = min(self.MAX_BLOCK, block)
# 计算每个block的权重
self.block_weight = self._cal_weight(self.ops) // self.block_num self.block_weight = self._cal_weight(self.ops) // self.block_num
def analyze(self): def analyze(self):
"""analyze ops""" """analyze ops"""
def _ops_type(ops, dom_op): def _ops_type(ops, dom_op):
"""
判断操作列表中是否包含reduce操作
Args:
ops (list): 操作列表
dom_op (list): 操作列表
Returns:
bool: 如果操作列表中包含reduce操作则返回True否则返回False
"""
# 检查ops列表中是否有reduce操作
have_reduce = any( have_reduce = any(
# 如果op的类型是PrimLib.REDUCE则返回True
(PrimLib.iter_type(op) == PrimLib.REDUCE for op in ops)) (PrimLib.iter_type(op) == PrimLib.REDUCE for op in ops))
if have_reduce: if have_reduce:
# 如果有reduce操作返回True
return True return True
# 否则返回dom_op[0]的类型
return PrimLib.iter_type(dom_op[0]) return PrimLib.iter_type(dom_op[0])
# 调用_ops_type函数获取dom_op的类型
dom_type = _ops_type(self.ops, self.dom_op) dom_type = _ops_type(self.ops, self.dom_op)
# 如果dom_type是PrimLib.ELEMWISE或PrimLib.BROADCAST类型
if dom_type in (PrimLib.ELEMWISE, PrimLib.BROADCAST): if dom_type in (PrimLib.ELEMWISE, PrimLib.BROADCAST):
# 调用injective_analyze方法
self.injective_analyze() self.injective_analyze()
# 如果dom_type是PrimLib.REDUCE类型
elif dom_type == PrimLib.REDUCE: elif dom_type == PrimLib.REDUCE:
# 调用reduce_analyze方法
self.reduce_analyze() self.reduce_analyze()
# 如果dom_type是其他类型
else: else:
# 调用default_analyze方法
self.default_analyze() self.default_analyze()
def suitable_to_pipeline(self): def suitable_to_pipeline(self):
"""judge whether is suitable to be pipeline optimized""" """judge whether is suitable to be pipeline optimized"""
# 判断是否适合进行流水线优化
# Reduce操作不适合
# Reduce is not suitable # Reduce is not suitable
def _contain_reduce(ops): def _contain_reduce(ops):
for op in ops: for op in ops:
# Reduce操作可能导致分片效果差
# Reduce may make the tiling bad. # Reduce may make the tiling bad.
if PrimLib.primtives.get(op.prim, None) == PrimLib.REDUCE: if PrimLib.primtives.get(op.prim, None) == PrimLib.REDUCE:
return True return True
@ -210,6 +320,7 @@ class ScheduleAnalyzer:
suitable = True suitable = True
if _contain_reduce(self.ops): if _contain_reduce(self.ops):
# 如果包含Reduce操作则不适合进行流水线优化
suitable = False suitable = False
return suitable return suitable
@ -227,13 +338,16 @@ class ScheduleAnalyzer:
classes (list[list[int]]): The list of clusters. Each cluster is a list of indices. classes (list[list[int]]): The list of clusters. Each cluster is a list of indices.
""" """
def _cal_mean(classes): def _cal_mean(classes):
# 计算每个聚类的均值
class_datas = list(list(data[cid] for cid in cls) for cls in classes) class_datas = list(list(data[cid] for cid in cls) for cls in classes)
return list(sum(cls) / len(cls) if cls else float('inf') for cls in class_datas) return list(sum(cls) / len(cls) if cls else float('inf') for cls in class_datas)
def _cal_distance(a, b): def _cal_distance(a, b):
# 计算两个元素之间的距离
return abs(a - b) return abs(a - b)
def _check_different(old_classes, new_classes): def _check_different(old_classes, new_classes):
# 检查新旧聚类是否不同
for o, n in zip(old_classes, new_classes): for o, n in zip(old_classes, new_classes):
if o != n: if o != n:
return True return True
@ -262,31 +376,39 @@ class ScheduleAnalyzer:
min_idx = i if min_dis > cur_dis else min_idx min_idx = i if min_dis > cur_dis else min_idx
min_dis = cur_dis if min_dis > cur_dis else min_dis min_dis = cur_dis if min_dis > cur_dis else min_dis
new_classes[min_idx].append(idx) new_classes[min_idx].append(idx)
# 检查聚类是否发生变化
changed = _check_different(classes, new_classes) changed = _check_different(classes, new_classes)
# 更新聚类
classes = new_classes classes = new_classes
return classes return classes
@staticmethod @staticmethod
def pipeline_fusion_analyze(blocks, op_sizes, exclude_id): def pipeline_fusion_analyze(blocks, op_sizes, exclude_id):
"""analyze whether the segments can be pipeline optimized""" """analyze whether the segments can be pipeline optimized"""
# op size first, block second. # op size first, block second。
# 操作大小在前,块在后
def _simple_factor(block, op_size): def _simple_factor(block, op_size):
return block + 5 * op_size return block + 5 * op_size
def _take_second(elem): def _take_second(elem):
return elem[1] return elem[1]
# 计算每个块的简单因子
simple_indicators = list(_simple_factor(b, s) simple_indicators = list(_simple_factor(b, s)
for b, s in zip(blocks, op_sizes)) for b, s in zip(blocks, op_sizes))
# 2 classes, one heavy, the other light # 2 classes, one heavy, the other light
# 两类,一类重,一类轻
classes = ScheduleAnalyzer.k_mean(simple_indicators, 2, exclude_id) classes = ScheduleAnalyzer.k_mean(simple_indicators, 2, exclude_id)
if not classes: if not classes:
return [] return []
# 计算每类的均值
means = list(sum([simple_indicators[idx] for idx in cls]) / means = list(sum([simple_indicators[idx] for idx in cls]) /
len(cls) if cls else float('inf') for cls in classes) len(cls) if cls else float('inf') for cls in classes)
# The target two clusters should be a heavy one and a light one. # The target two clusters should be a heavy one and a light one.
# 目标两类应该是一类重的和一类轻的
# The light one maybe suitable to run with pipeline optimized. # The light one maybe suitable to run with pipeline optimized.
# 轻的一类可能适合进行流水线优化
classes_infos = list([cls, m] for cls, m in zip(classes, means)) classes_infos = list([cls, m] for cls, m in zip(classes, means))
classes_infos.sort(key=_take_second) classes_infos.sort(key=_take_second)
pipeline_target = None pipeline_target = None
@ -295,6 +417,7 @@ class ScheduleAnalyzer:
pipeline_target = ci pipeline_target = ci
break break
pipeline_gids, pipeline_mean = pipeline_target pipeline_gids, pipeline_mean = pipeline_target
# 如果轻的一类的均值大于某个阈值,则返回空列表
if pipeline_mean > _simple_factor(float(ScheduleAnalyzer.MAX_SM) / len(blocks), if pipeline_mean > _simple_factor(float(ScheduleAnalyzer.MAX_SM) / len(blocks),
ScheduleAnalyzer.PIPELINE_OP_THREADHOLD): ScheduleAnalyzer.PIPELINE_OP_THREADHOLD):
return [] return []
@ -302,6 +425,7 @@ class ScheduleAnalyzer:
pipeline_blocks = [] pipeline_blocks = []
pipeline_weight = len(pipeline_gids) pipeline_weight = len(pipeline_gids)
# Try to make two paralleled at least. # Try to make two paralleled at least.
# 至少尝试两个并行
if pipeline_weight > 3 and pipeline_weight > len(blocks) / 2: if pipeline_weight > 3 and pipeline_weight > len(blocks) / 2:
if len(pipeline_gids[:pipeline_weight // 2]) > 1: if len(pipeline_gids[:pipeline_weight // 2]) > 1:
pipeline_blocks.append(pipeline_gids[:pipeline_weight // 2]) pipeline_blocks.append(pipeline_gids[:pipeline_weight // 2])
@ -313,49 +437,114 @@ class ScheduleAnalyzer:
@staticmethod @staticmethod
def fusion_consult(blocks, op_sizes, exclude_gid): def fusion_consult(blocks, op_sizes, exclude_gid):
"""
获取并行融合的建议
Args:
blocks (list): 包含多个计算块的列表
op_sizes (list): 每个操作的尺寸列表
exclude_gid (int): 需要排除的组ID
Returns:
tuple: 包含融合类型和类型信息的元组
Raises:
"""
"""get a recommendation for parallel fusion""" """get a recommendation for parallel fusion"""
# 默认是块融合
# Default is block fusion # Default is block fusion
fusion_type = "block_fusion" fusion_type = "block_fusion"
type_info = None type_info = None
# 禁用管道优化
activate_pipeline_optimization = False # Disable pipeline optimization for now. activate_pipeline_optimization = False # Disable pipeline optimization for now.
# 如果启用管道优化
if activate_pipeline_optimization: if activate_pipeline_optimization:
# 对块、操作大小和排除组ID进行管道融合分析
pipeline_info = ScheduleAnalyzer.pipeline_fusion_analyze( pipeline_info = ScheduleAnalyzer.pipeline_fusion_analyze(
blocks, op_sizes, exclude_gid) blocks, op_sizes, exclude_gid)
# 如果存在管道信息
if pipeline_info: if pipeline_info:
# 融合类型为块管道融合
fusion_type = "block_pipeline_fusion" fusion_type = "block_pipeline_fusion"
# 设置类型信息为管道信息
type_info = pipeline_info type_info = pipeline_info
return fusion_type, type_info return fusion_type, type_info
def block_parallel_estimate(graphs): def block_parallel_estimate(graphs):
"""
估计块并行增益
Args:
graphs (list): 图集合每个元素是一个图对象
Returns:
ParalGain: 包含块并行增益信息的ParalGain对象
"""
"""estimate block parallel gain""" """estimate block parallel gain"""
# 初始化变量
sum_block, max_weight, sum_weight, blocks, op_sizes, exclude_gid = 0, 0, 0, [], [], [] sum_block, max_weight, sum_weight, blocks, op_sizes, exclude_gid = 0, 0, 0, [], [], []
# 遍历图集合
for gid, g in enumerate(graphs): for gid, g in enumerate(graphs):
# 创建ScheduleAnalyzer对象
s = ScheduleAnalyzer(g) s = ScheduleAnalyzer(g)
# 分析图
s.analyze() s.analyze()
# 累加块的数量
sum_block += s.block_num sum_block += s.block_num
# 更新最大权重
if s.block_weight > max_weight: if s.block_weight > max_weight:
max_weight = s.block_weight max_weight = s.block_weight
# 累加权重
sum_weight += s.block_weight sum_weight += s.block_weight
# 添加块的数量到blocks列表
blocks.append(s.block_num) blocks.append(s.block_num)
# 添加操作数量到op_sizes列表
op_sizes.append(len(s.ops)) op_sizes.append(len(s.ops))
# 如果不适合流水线处理将gid添加到exclude_gid列表
if not s.suitable_to_pipeline(): if not s.suitable_to_pipeline():
exclude_gid.append(gid) exclude_gid.append(gid)
# 如果块的数量大于ScheduleAnalyzer.MAX_SM * 32返回"none"
if sum_block > ScheduleAnalyzer.MAX_SM * 32: if sum_block > ScheduleAnalyzer.MAX_SM * 32:
return ParalGain("none", sum_weight, 0, list(0 for _ in graphs), None) return ParalGain("none", sum_weight, 0, list(0 for _ in graphs), None)
# 获取融合类型和类型信息
fusion_type, type_info = ScheduleAnalyzer.fusion_consult(blocks, op_sizes, tuple(exclude_gid)) fusion_type, type_info = ScheduleAnalyzer.fusion_consult(blocks, op_sizes, tuple(exclude_gid))
# 返回ParalGain对象
return ParalGain(fusion_type, max_weight, sum_weight - max_weight, blocks, type_info) return ParalGain(fusion_type, max_weight, sum_weight - max_weight, blocks, type_info)
def parallel_estimate(graphs, target): def parallel_estimate(graphs, target):
"""
并行估计函数
Args:
graphs (list): 图结构列表
target (str): 目标类型例如"aicore"
Returns:
ParalGain: 并行增益对象
"""
"""Estimate parallel gain""" """Estimate parallel gain"""
# 如果目标是"aicore"
if target == "aicore": if target == "aicore":
# 融合类型为"block_fusion"
fusion_type = "block_fusion" fusion_type = "block_fusion"
# 类型信息为空
type_info = None type_info = None
# 假设估计值为1000
fake_estimate = 1000 fake_estimate = 1000
# 生成一个与graphs长度相同的列表每个元素都是1
fake_blocks = list(1 for g in graphs) fake_blocks = list(1 for g in graphs)
# 返回ParalGain对象
return ParalGain(fusion_type, fake_estimate, fake_estimate, fake_blocks, type_info) return ParalGain(fusion_type, fake_estimate, fake_estimate, fake_blocks, type_info)
# 调用block_parallel_estimate函数进行并行估计
return block_parallel_estimate(graphs) return block_parallel_estimate(graphs)

Loading…
Cancel
Save