From 569be2fa4efca930ea11d2b4b01acccf2b8f54bc Mon Sep 17 00:00:00 2001 From: yixin <2050485123@qq.com> Date: Wed, 25 Dec 2024 17:31:48 +0800 Subject: [PATCH] _extends\parallel_compile\tbe_compiler --- .../tbe_compiler/tbe_adapter.py | 480 ++++++++++++++---- .../tbe_compiler/tbe_helper.py | 97 ++++ .../parallel_compile/tbe_compiler/tbe_job.py | 81 +++ .../tbe_compiler/tbe_job_manager.py | 176 +++++++ 4 files changed, 746 insertions(+), 88 deletions(-) diff --git a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_adapter.py b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_adapter.py index 926aab76..b55a2f1b 100644 --- a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_adapter.py +++ b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_adapter.py @@ -13,6 +13,7 @@ # limitations under the License. # ============================================================================ """tbe adapter to adapt te/topi/auto-tune python api """ +# 导入必要的库和模块 import json import os import shutil @@ -20,33 +21,62 @@ import sys import traceback from datetime import datetime +# 导入TBE相关的库和模块 from tbe.common.rl_bank.bank_manager import set_current_op_name from tbe.common.repository_manager.interface import cann_kb_unload, cann_kb_load from tbe.common.rl_bank.bank_cfg import LocalLock from te.platform.cce_conf import te_set_version from te.platform.cce_policy import set_L1_info -from te_fusion.compile_task_manager import dispatch_prebuild_task, dispatch_single_op_compile_task, import_py_module, \ - dispatch_fusion_op_compile_task, dispatch_autotune_task, sync_op_tune_params -from te_fusion.compile_task_manager import sync_syspath -from te_fusion.fusion_manager import call_op_func, clear_fusion_params, check_op_impl_mode, \ - save_op_params, build_single_op_from_c, op_params_to_json +from te_fusion.compile_task_manager import ( + dispatch_prebuild_task, + dispatch_single_op_compile_task, + import_py_module, + dispatch_fusion_op_compile_task, + dispatch_autotune_task, + sync_op_tune_params, + sync_syspath +) +from te_fusion.fusion_manager import ( + call_op_func, + clear_fusion_params, + check_op_impl_mode, + save_op_params, + build_single_op_from_c, + op_params_to_json +) from te_fusion.fusion_util import dump_fusion_json -from te_fusion.parallel_compilation import init_multi_process_env, start_ga_multi_process, deinit_multi_process_env, \ +from te_fusion.parallel_compilation import ( + init_multi_process_env, + start_ga_multi_process, + deinit_multi_process_env, get_finished_compilation_task - -from .tbe_helper import get_soc_info, assemble_op_args, get_compute_op_list, get_options_info, get_fuzz_build_info, \ - adjust_custom_op_info, pack_op_args, get_module_name, get_real_op_debug_level +) +from .tbe_helper import ( + get_soc_info, + assemble_op_args, + get_compute_op_list, + get_options_info, + get_fuzz_build_info, + adjust_custom_op_info, + pack_op_args, + get_module_name, + get_real_op_debug_level +) from .tbe_job import TbeJob, JobStatus -PLATFORM_FLAG = ["Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403"] - +# 定义支持的平台标志 +PLATFORM_FLAG = [ + "Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403" +] +# 定义Tune初始化函数 def _tune_init(job: TbeJob): """ - Tune Initialize - :param job: - :return: + Tune初始化 + :param job: TbeJob对象,包含任务信息 + :return: 初始化是否成功 """ + # 提取Soc信息和Tune信息 auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"] offline_tune = job.content["SocInfo"]["offlineTune"] op_bank_update = job.content["SocInfo"]["op_bank_update"] @@ -54,11 +84,14 @@ def _tune_init(job: TbeJob): tune_bank_path = job.content["TuneInfo"]["tune_bank_path"] need_ga = bool("GA" in auto_tiling_mode) need_rl = bool("RL" in auto_tiling_mode) + + # 设置环境变量 if offline_tune: os.environ["ENABLE_TUNE_DUMP"] = "TRUE" if op_bank_update: sync_op_tune_params("tbe.common.tiling.tiling_api", "reset_repository", False, "") + # 初始化Tune环境 if need_ga or need_rl or offline_tune: res = __init_tune_env(job, need_ga) if not res: @@ -66,6 +99,7 @@ def _tune_init(job: TbeJob): else: return True + # 设置Tune路径 if tune_dump_path: os.environ["TUNE_DUMP_PATH"] = str(tune_dump_path) if tune_bank_path: @@ -73,12 +107,12 @@ def _tune_init(job: TbeJob): res = _creating_custom_path(job) return res - +# 定义CANN知识库加载函数 def _cann_kb_load(job: TbeJob): """ - database load - :param job: - :return: + 加载CANN知识库 + :param job: TbeJob对象,包含任务信息 + :return: 加载是否成功 """ soc_version = job.soc_version core_num = job.core_num @@ -87,12 +121,12 @@ def _cann_kb_load(job: TbeJob): res = cann_kb_load(soc_version, core_num, op_bank_path, kb_type) return res - +# 定义CANN知识库卸载函数 def _cann_kb_unload(job: TbeJob): """ - database unload - :param job: - :return: + 卸载CANN知识库 + :param job: TbeJob对象,包含任务信息 + :return: 卸载是否成功 """ if job is None: return 0 @@ -102,12 +136,12 @@ def _cann_kb_unload(job: TbeJob): res = cann_kb_unload(soc_version, core_num, kb_type) return res - +# 定义移除缓存文件函数 def _remove_cache(job: TbeJob): """ - :param job: remove cache file:[*.json, *.o, *.info, *.cce] when "op_debug_level" is "0" - op_debug_level: representation the env MS_COMPILER_OP_LEVEL - :return: + 移除缓存文件 + :param job: TbeJob对象,包含任务信息 + :return: 无 """ op_debug_level = job.content["SocInfo"]["op_debug_level"] op_debug_dir = job.content["SocInfo"]["op_debug_dir"] @@ -118,24 +152,30 @@ def _remove_cache(job: TbeJob): real_path = os.path.join(root_path, "kernel_meta/") shutil.rmtree(real_path) - +# 定义创建目录函数 def __directory_creation(path, concat_path): """ - Create directory + 创建目录 + :param path: 基础路径 + :param concat_path: 需要连接的路径 + :return: 创建后的完整路径 """ path = os.path.join(path, concat_path) if not os.path.isdir(path): os.makedirs(path, 0o750) return path - +# 定义初始化Tune环境函数 def __init_tune_env(job, need_ga): """ - Initialize tune env + 初始化Tune环境 + :param job: TbeJob对象,包含任务信息 + :param need_ga: 是否需要GA + :return: 初始化是否成功 """ try: import auto_tune.auto_tune_main as at_atm - from schedule_search.rl_online_tune import rl_tune_init # pylint: disable=unused-import + from schedule_search.rl_online_tune import rl_tune_init if need_ga: res = at_atm.ga_tune_init() if not res: @@ -157,10 +197,13 @@ def __init_tune_env(job, need_ga): finally: pass - +# 定义创建默认自定义路径函数 def __creating_default_custom_path(auto_tiling_mode, base_custom_path): """ - Create default custom path + 创建默认自定义路径 + :param auto_tiling_mode: 自动平铺模式 + :param base_custom_path: 基础自定义路径 + :return: 无 """ base_custom_path = __directory_creation(base_custom_path, "data") tune_flag = [] @@ -179,27 +222,40 @@ def __creating_default_custom_path(auto_tiling_mode, base_custom_path): def _creating_custom_path(job): """ - Create custom path + 创建自定义路径,用于存储和检索自定义算子的调优参数。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 自定义路径创建是否成功。 """ + # 获取自动平铺模式 auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"] + # 如果模式中包含"NO_TUNE",则不需要创建自定义路径 if "NO_TUNE" in auto_tiling_mode: return True + # 获取调优参数的基础路径 base_custom_path = job.content["TuneInfo"]["tune_bank_path"] tune_bank_flag = True + # 如果基础路径不存在,则尝试从auto_tune模块获取 if not base_custom_path: import auto_tune base_custom_path = os.path.dirname(os.path.realpath(auto_tune.__file__)) base_custom_path = os.path.realpath(os.path.join(base_custom_path, "../../../")) tune_bank_flag = False + # 检查基础路径是否存在 if not os.path.isdir(base_custom_path): job.error("Check whether the tuning path [{}] exists.".format(base_custom_path)) return False + # 检查基础路径的权限 if not os.access(base_custom_path, os.R_OK | os.W_OK | os.X_OK): job.error("Check whether the permission on the tuning path [{}] is correct.".format(base_custom_path)) return False + # 如果不需要创建调优参数库,则直接返回成功 if not tune_bank_flag: return __creating_default_custom_path(auto_tiling_mode, base_custom_path) return True @@ -207,22 +263,34 @@ def _creating_custom_path(job): def _parallel_compilation_init(initialize: TbeJob): """ - Tbe parallel compilation initialize - :param initialize: - :return: + 初始化TBE并行编译环境。 + + Args: + initialize (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 并行编译环境初始化是否成功。 """ + # 设置并行编译器的环境变量 os.environ["TE_PARALLEL_COMPILER"] = str(initialize.content["process_num"]) + # 获取SoC信息 soc_info = get_soc_info(initialize.content) + # 获取实际的调试级别 real_debug_level = get_real_op_debug_level(initialize.content) + # 获取自动平铺模式 auto_tiling_mode = initialize.content["SocInfo"]["autoTilingMode"] + # 获取是否需要离线调优 offline_tune = initialize.content["SocInfo"]["offlineTune"] + # 生成进程ID和时间戳的组合字符串 pid_ts = "{}_pid{}".format(datetime.now().strftime('%Y%m%d_%H%M%S%f')[:-3], os.getpid()) + # 初始化多进程环境 ret = init_multi_process_env(False, soc_info, auto_tiling_mode, real_debug_level, None, 1, pid_ts) if ret is None: initialize.error("Init multiprocess env failed") return False initialize.info("Init multiprocess env success with {} process".format(ret[0])) + # 如果需要RL或离线调优,则初始化RL环境 if "RL" in auto_tiling_mode or offline_tune: res_queue = ret[1] live_checker = ret[2] @@ -234,6 +302,7 @@ def _parallel_compilation_init(initialize: TbeJob): initialize.error("RL env init failed!") return False initialize.info("RL Tune init success.") + # 如果需要GA,则启动GA多进程 if "GA" in auto_tiling_mode: start_ga_multi_process(auto_tiling_mode) initialize.info("GA Tune init success.") @@ -242,31 +311,44 @@ def _parallel_compilation_init(initialize: TbeJob): def tbe_initialize(job: TbeJob): """ - Tbe Initialize - :param job: - :return: + 初始化TBE环境。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: TBE环境初始化是否成功。 """ + # 设置上下文模型编译环境变量 os.environ["CONTEXT_MODELCOMPILING"] = "TRUE" + # 获取SoC信息 soc_info = get_soc_info(job.content) + # 设置版本 res = te_set_version(*soc_info) if not res: job.error("Set version failed") + # 初始化调优环境 res = _tune_init(job) if not res: job.error("Tune init failed") + # 创建锁文件 lock_file = os.path.join(job.content["SocInfo"]["op_debug_dir"], "kernel_meta", "file.lock") local_lock = LocalLock(lock_file) try: + # 加锁 local_lock.lock() + # 加载CANN知识库 res = _cann_kb_load(job) if res == 1: job.error("Cann kb load failed") + # 初始化并行编译 res = _parallel_compilation_init(job) if not res: job.error("Parallel compilation failed") except RuntimeError: job.error("Initialize failed with RuntimeError") finally: + # 解锁 local_lock.unlock() job.result = "Success" return res @@ -274,9 +356,13 @@ def tbe_initialize(job: TbeJob): def get_auto_tune_support_op_list(job: TbeJob): """ - Get GA tune supported op list - :param job: - :return: + 获取支持自动调优的算子列表。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + list: 支持自动调优的算子列表。 """ from auto_tune_main import enable_auto_tune_support auto_tune_op_list = enable_auto_tune_support() @@ -286,10 +372,14 @@ def get_auto_tune_support_op_list(job: TbeJob): def _normalize_module_name(module_name, py_module_path): """ - Normalize module name - :param module_name: - :param py_module_path: - :return: + 规范化模块名称。 + + Args: + module_name (str): 模块名称。 + py_module_path (str): Python模块路径。 + + Returns: + None """ if py_module_path not in sys.path: sys.path.insert(0, py_module_path) @@ -298,9 +388,13 @@ def _normalize_module_name(module_name, py_module_path): def check_support(job: TbeJob): """ - Check support - :param job: - :return: + 检查算子是否受支持。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 算子是否受支持。 """ op_compute_info_list = get_compute_op_list(job.content) if len(op_compute_info_list) != 1: @@ -341,21 +435,37 @@ def check_support(job: TbeJob): def select_op_format(job: TbeJob): """ Select op format - :param job: - :return: + 根据计算操作信息选择操作的格式。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 操作格式选择是否成功。 """ + # 获取计算操作列表 compute_op_info_list = get_compute_op_list(job.content) + # 检查计算操作数量是否为1 if len(compute_op_info_list) != 1: job.error("Invalid op compute num ({}) in check_support".format(len(compute_op_info_list))) return False + # 获取第一个计算操作信息 compute_op_info = compute_op_info_list[0] + # 调整自定义操作信息 adjust_custom_op_info(compute_op_info) + # 组装操作参数 inputs, outputs, attrs = assemble_op_args(compute_op_info) + # 获取操作模块名称 op_module_name = get_module_name(compute_op_info) + # 获取Python模块路径 py_module_path = compute_op_info["py_module_path"] + # 规范化模块名称 _normalize_module_name(op_module_name, py_module_path) + # 设置操作选择格式的函数名称 op_func_name = "op_select_format" + # 调用操作函数选择格式 res = call_op_func((inputs, outputs, attrs), op_module_name, op_func_name) + # 设置操作格式选择结果 job.result = str(res) return True @@ -363,15 +473,25 @@ def select_op_format(job: TbeJob): def parallel_pre_compile_op(job: TbeJob): """ Parallel pre compile op - :param job: - :return: + 并行预编译操作。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 预编译操作是否成功。 """ + # 获取计算操作列表 compute_op_info_list = get_compute_op_list(job.content) + # 检查计算操作数量是否为1 if len(compute_op_info_list) != 1: job.error("Invalid op compute num ({}) in pre compile op".format(len(compute_op_info_list))) return False + # 获取第一个计算操作信息 compute_op_info = compute_op_info_list[0] + # 调整自定义操作信息 adjust_custom_op_info(compute_op_info) + # 预构建计算操作信息 _pre_build_compute_op_info(compute_op_info, job) return True @@ -379,35 +499,60 @@ def parallel_pre_compile_op(job: TbeJob): def _pre_build_compute_op_info(compute_op, job): """ Prebuild by compute op info - :param compute_op: - :param job: - :return: + 根据计算操作信息预构建操作。 + + Args: + compute_op (dict): 计算操作信息。 + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + None """ + # 获取L1缓存大小 l1_size = job.content["l1_size"] + # 如果L1缓存大小不为-1,则设置L1缓存信息 if l1_size != -1: set_L1_info("op_L1_space", -1) + # 组装操作参数 inputs, outputs, attrs = assemble_op_args(compute_op, is_single_op_build=True) + # 获取操作模块名称 op_module_name = get_module_name(compute_op) + # 获取Python模块路径 py_module_path = compute_op["py_module_path"] + # 获取操作函数名称 op_func_name = compute_op["func_name"] + # 获取操作类型 op_type = compute_op["type"] + # 获取操作名称 op_name = compute_op["op_name"] + # 保存操作参数 save_op_params(op_name, "prebuild", (outputs, attrs)) - l1_size = job.content["l1_size"] + # 设置L1缓存信息 set_L1_info("op_L1_space", l1_size) + # 规范化模块名称 _normalize_module_name(op_module_name, py_module_path) + # 获取未知形状信息 unknown_shape = compute_op["unknown_shape"] + # 获取int64模式信息 int64_mode = compute_op["int64mode"] + # 检查操作实现模式 res = check_op_impl_mode(op_module_name, op_func_name) + # 获取操作实现模式 op_impl_mode = job.content["SocInfo"]["op_impl_mode"] + # 获取操作实现模式列表 op_impl_mode_list = job.content["SocInfo"]["op_impl_mode_list"] + # 获取完整操作名称 op_full_name = job.content["full_name"] + # 如果操作不支持实现模式,则发出警告 if not res: if op_impl_mode_list: job.warning("The op {} do NOT support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode)) else: + # 否则,记录操作支持实现模式的信息 job.info("OpType {} support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode)) + # 获取选项信息 options = get_options_info(job.content) + # 分派预构建任务 dispatch_prebuild_task(job.source_id, job.id, l1_size, op_module_name, op_full_name, op_type, op_func_name, unknown_shape, (inputs, outputs, attrs, options), int64_mode, unknown_shape, @@ -416,13 +561,22 @@ def _pre_build_compute_op_info(compute_op, job): def get_prebuild_output(op_name): """ - get prebuild output - :param op_name: + Get prebuild output + 获取预构建输出。 + + Args: + op_name (str): 操作名称。 + + Returns: + dict: 预构建输出。 """ + # 将操作参数转换为JSON字符串 params_str = op_params_to_json(op_name) try: + # 尝试解析JSON字符串 res = json.loads(params_str) except ValueError: + # 如果解析失败,则返回空字典 res = {} finally: pass @@ -432,9 +586,15 @@ def get_prebuild_output(op_name): def do_fuzz_build_tbe_op(job: TbeJob): """ Fuzzy build op - :param job: - :return: + 模糊构建操作。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 模糊构建操作是否成功。 """ + # 设置操作结果为"NOT_CHANGED" job.result = "NOT_CHANGED" return True @@ -442,9 +602,15 @@ def do_fuzz_build_tbe_op(job: TbeJob): def _dump_fusion_op_info_to_json_file(job: TbeJob): """ Dump fusion op info to json file - :param job: - :return: + 将融合操作信息转储到JSON文件。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + None """ + # 如果系统参数调试路径不为空,则转储融合操作信息 if not job.sys_para_debug_path or job.sys_para_debug_path == "\0": return dump_fusion_json(json.dumps(job.content), job.sys_para_debug_path) @@ -453,30 +619,55 @@ def _dump_fusion_op_info_to_json_file(job: TbeJob): def build_single_pre_op(job: TbeJob): """ Build single op - :param job: - :return: + 构建单个操作的预处理过程。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 构建过程是否成功。 """ + # 执行构建前的处理工作 before_build_process(job) + # 获取计算操作列表 compute_op_info_list = get_compute_op_list(job.content) + # 确保只有一个计算操作 if len(compute_op_info_list) != 1: job.error("Invalid op compute num ({}) in build single op".format(len(compute_op_info_list))) return False + # 获取单个计算操作信息 compute_op_info = compute_op_info_list[0] + # 调整自定义操作信息 adjust_custom_op_info(compute_op_info) + # 组装操作的输入、输出和属性 inputs, outputs, attrs = assemble_op_args(compute_op_info, is_single_op_build=True) + # 获取操作类型 op_type = compute_op_info["type"] + # 获取L1缓存大小 l1_size = job.content["l1_size"] + # 获取操作模块名称 op_module_name = get_module_name(compute_op_info) + # 获取操作内核名称 op_kernel_name = compute_op_info["op_name"] + # 获取Python模块路径 py_module_path = compute_op_info["py_module_path"] + # 获取完整操作名称 op_name = job.content["full_name"] + # 获取操作函数名称 op_func_name = compute_op_info["func_name"] + # 规范化模块名称 _normalize_module_name(op_module_name, py_module_path) + # 获取未知形状信息 unknown_shape = compute_op_info["unknown_shape"] + # 获取int64模式信息 int64_mode = compute_op_info["int64mode"] + # 获取操作模式 op_pattern = compute_op_info["pattern"] + # 获取选项信息 options = get_options_info(job.content) + # 获取模糊构建信息 fuzz_build_info = get_fuzz_build_info(job.content) + # 分派单个操作编译任务 dispatch_single_op_compile_task(job.source_id, job.id, l1_size, op_module_name, op_name, op_type, op_func_name, op_kernel_name, unknown_shape, (inputs, outputs, attrs, options), int64_mode, None, None, unknown_shape, op_pattern, @@ -487,13 +678,22 @@ def build_single_pre_op(job: TbeJob): def before_build_process(job: TbeJob): """ Processing before build - :param job: - :return: + 在构建前进行处理。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + None """ + # 获取L1缓存大小并设置 l1_size = job.content["l1_size"] set_L1_info("op_L1_space", l1_size) + # 将融合操作信息转储到JSON文件 _dump_fusion_op_info_to_json_file(job) + # 获取是否需要离线调优 offline_tune = job.sys_offline_tune + # 如果需要离线调优,则将融合操作信息转储到JSON文件 if offline_tune: dump_fusion_json(json.dumps(job.content), job.sys_tune_dump_path) @@ -501,20 +701,29 @@ def before_build_process(job: TbeJob): def sync_fusion_env(fusion_need_sync, module_list): """ Sync fusion env - :param fusion_need_sync: - :param module_list: - :return: + 同步融合环境。 + + Args: + fusion_need_sync (int): 是否需要同步融合环境。 + module_list (dict): 模块列表。 + + Returns: + bool: 同步是否成功。 """ + # 如果不需要同步,则直接返回成功 if fusion_need_sync == 0: return True + # 准备使用的模块列表 module_using = [] for key, value in module_list.items(): if value > 0: module_using.append(str(key)) module_list[key] = 0 + # 将使用的模块列表转换为字符串 module_str = ",".join(module_using) + # 导入使用的模块 import_py_module(module_str) return True @@ -522,13 +731,23 @@ def sync_fusion_env(fusion_need_sync, module_list): def parallel_compile_fusion_op(job: TbeJob): """ Compile fusion op in parallel compiler - :param job: - :return: + 在并行编译器中编译融合操作。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 编译过程是否成功。 """ + # 获取L1缓存大小 l1_size = job.content["l1_size"] + # 获取选项信息 options = get_options_info(job.content) + # 获取融合操作内核名称 op_kernel_name = job.content["fusion_op_name"] + # 获取完整操作名称 op_name = job.content["full_name"] + # 分派融合操作编译任务 dispatch_fusion_op_compile_task(job.source_id, job.id, l1_size, json.dumps(job.content), op_kernel_name, None, None, options, None, job.pass_list, op_name) return True @@ -537,112 +756,185 @@ def parallel_compile_fusion_op(job: TbeJob): def ga_tune(job: TbeJob): """ GA tune - :param job: - :return: + 使用遗传算法进行调优。 + + Args: + job (TbeJob): 包含任务信息的TbeJob对象。 + + Returns: + bool: 调优过程是否成功。 """ + # 获取L1缓存大小 l1_size = job.content["l1_size"] + # 获取融合操作内核名称 op_kernel_name = job.content["fusion_op_name"] + # 获取完整操作名称 op_name = job.content["full_name"] + # 分派自动调优任务 dispatch_autotune_task(job.source_id, job.id, l1_size, json.dumps(job.content), {}, op_kernel_name, op_name) + # 设置任务状态为运行中 job.status = JobStatus.JOB_RUNNING return True def rl_tune_single_op(job: TbeJob): """ - RL tune single op - :param job: - :return: + Perform RL (Reinforcement Learning) tuning for a single operation. + + This function is responsible for tuning a single operation using RL techniques. + It retrieves the operation's information, performs the tuning, and handles any exceptions that may occur during the process. + + Args: + job (TbeJob): An object containing job information, including the operation to be tuned. + + Returns: + bool: True if the RL tuning is successful, False otherwise. """ + # Retrieve the list of compute operations from the job content compute_op_info_list = get_compute_op_list(job.content) + # Check if there is exactly one compute operation if len(compute_op_info_list) != 1: job.error("Invalid op compute num ({}) in rl tune single op".format(len(compute_op_info_list))) return False + # Get the first (and only) compute operation info compute_op_info = compute_op_info_list[0] + # Assemble the operation's input, output, and attributes inputs, outputs, attrs = assemble_op_args(compute_op_info) + # Get the operation type op_type = compute_op_info["type"] + # Get the L1 size from the job content l1_size = job.content["l1_size"] + # Get the operation module name op_module_name = get_module_name(compute_op_info) + # Get the operation kernel name op_kernel_name = compute_op_info["op_name"] + # Get the full name of the operation full_name = compute_op_info["name"] + # Get the Python module path py_module_path = compute_op_info["py_module_path"] + # Get the operation function name op_func_name = compute_op_info["func_name"] + # Normalize the module name _normalize_module_name(op_module_name, py_module_path) + # Set the current operation name set_current_op_name(op_kernel_name) + # Get the unknown shape information unknown_shape = compute_op_info["unknown_shape"] + # Get the int64 mode information int64_mode = compute_op_info["int64mode"] + # Get the operation pattern op_pattern = compute_op_info["pattern"] + # Get the fuzz build information fuzz_build_info = get_fuzz_build_info(job.content) + # Get the auto tiling mode auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"] + # Get the device ID device_id = job.content["SocInfo"]["deviceId"] + # Get the options information options = get_options_info(job.content) try: + # Build the single operation from C code build_single_op_from_c(op_module_name, op_func_name, op_type, "build", unknown_shape, (inputs, outputs, attrs), int64_mode, unknown_shape, options, op_pattern, auto_tiling_mode, device_id, json.dumps(fuzz_build_info)) - # pylint: disable=broad-except except Exception: + # If an exception occurs, log the error and return False job.error( "Single op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string)) exc_type, exc_value, _ = sys.exc_info() job.error( "exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc())) return False - finally: - pass + # Prepare the tuning operation module name tune_op_module_name = op_module_name + "@" + py_module_path + # Get the base kernel path base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o" + # Dispatch the single tune task from schedule_search.rl_online_tune import dispatch_single_tune_task pack_args = pack_op_args(inputs, outputs, attrs) res = dispatch_single_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, full_name, tune_op_module_name, op_func_name, op_type, pack_args) + # Process the RL tune result return _process_rl_tune_result(job, op_type, res) def rl_tune_fusion_op(job: TbeJob): """ - rl tune fusion op - :param job: - :return: + Perform RL tuning for a fusion operation. + + This function is responsible for tuning a fusion operation using RL techniques. + It compiles the operation using multiprocessing and handles any exceptions that may occur during the process. + + Args: + job (TbeJob): An object containing job information, including the fusion operation to be tuned. + + Returns: + bool: True if the RL tuning is successful, False otherwise. """ + # Get the fusion operation kernel name op_kernel_name = job.content["fusion_op_name"] + # Set the current operation name set_current_op_name(op_kernel_name) try: + # Compile the operation using multiprocessing from schedule_search.rl_online_tune import compile_op_by_mp compile_op_by_mp(json.dumps(job.content)) # pylint: disable=broad-except except Exception: + # If an exception occurs, log the error and return False job.error( "Fusion op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string)) exc_type, exc_value, _ = sys.exc_info() job.error( "exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc())) return False - finally: - pass + # Get the L1 size l1_size = job.content["l1_size"] + # Get the base kernel path base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o" + # Get the list of compute operations compute_op_list = get_compute_op_list(job.content) + # Prepare the operation module names string op_module_names_str = "" op_type_set = set() for op in compute_op_list: op_module_names_str = ','.join([op_module_names_str, get_module_name(op)]) op_type_set.add(op["type"]) + # Remove the leading comma from the operation module names string op_module_names_str = op_module_names_str[1:] + # Join the operation types with double underscore op_type = "__".join(list(op_type_set)) + # Dispatch the fusion tune task from schedule_search.rl_online_tune import dispatch_fusion_tune_task res = dispatch_fusion_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, op_module_names_str, json.dumps(job.content)) + # Process the RL tune result return _process_rl_tune_result(job, op_type, res) def _process_rl_tune_result(job, op_type, res): + """ + Process the result of RL tuning. + + If the tuning result is False, it checks if the operation type is in the black list or if the job is set to offline tune. + If the tuning result is True, it sets the job status to running. + + Args: + job (TbeJob): An object containing job information. + op_type (str): The type of the operation. + res (bool): The result of RL tuning. + + Returns: + bool: The processed result of RL tuning. + """ if not res: + # Check if the operation type is in the black list or if the job is set to offline tune from schedule_search.tune_util import filter_black_op_type res = bool(job.sys_offline_tune or os.getenv("REPEAT_TUNE", "False").lower() != "true" or filter_black_op_type( op_type)) else: + # Set the job status to running job.status = JobStatus.JOB_RUNNING res = True return res @@ -650,8 +942,13 @@ def _process_rl_tune_result(job, op_type, res): def get_finish_tasks(source_id): """ - Get finish task from parallel compilation framework - :return task info list + Get the list of finished tasks from the parallel compilation framework. + + Args: + source_id (int): The source ID of the tasks. + + Returns: + list: A list of finished task information. """ return get_finished_compilation_task(source_id) @@ -664,14 +961,21 @@ def tbe_finalize(auto_tiling_mode, offline_tune, job: TbeJob): :param job: TbeJob :return: None """ + # 释放多进程环境 deinit_multi_process_env() + # 如果自动切分模式为RL或者离线调优,则释放RL调优 if "RL" in auto_tiling_mode or offline_tune: from schedule_search.rl_online_tune import rl_tune_deinit rl_tune_deinit() + # 卸载Cann kb res = _cann_kb_unload(job) + # 如果卸载失败,则返回False if res == 1: job.error("Cann kb unload failed") return False + # 清除融合参数 clear_fusion_params() + # 删除缓存 _remove_cache(job) + # 返回True return True diff --git a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_helper.py b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_helper.py index 6b48a037..7af39764 100644 --- a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_helper.py +++ b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_helper.py @@ -26,6 +26,7 @@ class BuildType(Enum): ACCURATELY = "accurately" +# 获取JobType枚举类中的所有值 job_type_list = [job_type.value for _, job_type in JobType.__members__.items()] @@ -35,14 +36,19 @@ def check_job_json(job_info): :param job_info:tne compilation job json :return: raise value error if wrong """ + # 检查job_info中是否包含source_id if 'source_id' not in job_info: raise ValueError("Json string Errors, key:source_id not found.") + # 检查job_info中是否包含job_id if 'job_id' not in job_info: raise ValueError("Json string Errors, key:job_id not found.") + # 检查job_info中是否包含job_type if 'job_type' not in job_info or not job_info['job_type']: raise ValueError("Json string Errors, key:job_type not found.") + # 检查job_info中job_type是否在job_type_list中 if job_info['job_type'] not in job_type_list: raise ValueError("Invalid job type: {}.".format(job_info['job_type'])) + # 检查job_info中是否包含job_content if 'job_content' not in job_info: raise ValueError("Json string Errors, key:job_content not found.") @@ -52,6 +58,7 @@ def reset_op_debug_level_in_soc_info(level): :param level: op_debug_level, if level is 3 or 4, replace it with 0 :return: op_debug_level """ + # 如果level为3或4,则将其替换为0 if level in ("3", "4"): level = "0" return level @@ -62,6 +69,7 @@ def get_real_op_debug_level(initialize_job_info): :param initialize_job_info: initialize_job_info :return: origin op_debug_level for init_multi_process_env """ + # 返回initialize_job_info中op_debug_level的值 return initialize_job_info["SocInfo"]["op_debug_level"] @@ -72,21 +80,35 @@ def get_soc_info(initialize_job_info): :return: soc info """ soc_param = dict() + # 获取soc_info中的op_impl_mode soc_param["op_impl_mode"] = initialize_job_info["SocInfo"]["op_impl_mode"] + # 获取soc_info中的op_debug_level,并调用reset_op_debug_level_in_soc_info函数进行处理 soc_param["op_debug_level"] = reset_op_debug_level_in_soc_info(initialize_job_info["SocInfo"]["op_debug_level"]) + # 获取soc_info中的op_impl_mode_list soc_param["op_impl_mode_list"] = initialize_job_info["SocInfo"]["op_impl_mode_list"] + # 获取soc_info中的op_debug_dir soc_param["op_debug_dir"] = initialize_job_info["SocInfo"]["op_debug_dir"] + # 获取soc_info中的vector_fp_ceiling soc_param["vector_fp_ceiling"] = initialize_job_info["SocInfo"]["vector_fp_ceiling"] + # 获取soc_info中的mdl_bank_path soc_param['mdl_bank_path'] = initialize_job_info["SocInfo"]["mdl_bank_path"] + # 获取soc_info中的op_bank_path soc_param['op_bank_path'] = initialize_job_info["SocInfo"]["op_bank_path"] soc_info = list() + # 获取soc_info中的socVersion soc_info.append(initialize_job_info["SocInfo"]["socVersion"]) + # 获取soc_info中的coreType soc_info.append(initialize_job_info["SocInfo"]["coreType"]) + # 获取soc_info中的coreNum soc_info.append(initialize_job_info["SocInfo"]["coreNum"]) + # 获取soc_info中的l1Fusion soc_info.append(initialize_job_info["SocInfo"]["l1Fusion"]) + # 获取soc_info中的l2Mode soc_info.append(initialize_job_info["SocInfo"]["l2Mode"]) + # 获取soc_info中的l2Fusion soc_info.append(initialize_job_info["SocInfo"]["l2Fusion"]) + # 将soc_param添加到soc_info中 soc_info.append(soc_param) return soc_info @@ -98,16 +120,22 @@ def check_arg_info(io_info): :param io_info:A dict, to be checked. :return: Exception: If specific keyword is not found. """ + # 检查io_info中是否包含shape if 'shape' not in io_info: raise ValueError("Json string Errors, key:shape not found.") + # 检查io_info中是否包含ori_shape if 'ori_shape' not in io_info: raise ValueError("Json string Errors, key:ori_shape not found.") + # 检查io_info中是否包含format if 'format' not in io_info or not io_info['format']: raise ValueError("Json string Errors, key:format not found.") + # 检查io_info中是否包含ori_format if 'ori_format' not in io_info or not io_info['ori_format']: raise ValueError("Json string Errors, key:ori_format not found.") + # 检查io_info中是否包含dtype if 'dtype' not in io_info or not io_info['dtype']: raise ValueError("Json string Errors, key:dtype not found.") + # 检查io_info中是否包含param_type if 'param_type' not in io_info or not io_info['param_type']: raise ValueError("Json string Errors, key:param_type not found.") @@ -119,18 +147,28 @@ def get_input_output_args(io_info): :return:input/output args """ args = [] + # 如果io_info为空,则返回空列表 if io_info is None: return args + # 遍历io_info中的每个元素 for item in io_info: + # 如果元素是字典类型 if isinstance(item, dict): + # 调用get_single_io_arg函数获取单个输入/输出参数 arg = get_single_io_arg(item) args.append(arg) elif isinstance(item, list): + # 如果元素是列表类型 dyn_arg = [] + # 创建一个空列表dyn_arg for info in item: + # 遍历列表中的每个元素 arg = get_single_io_arg(info) + # 调用get_single_io_arg函数获取单个输入/输出参数 dyn_arg.append(arg) + # 将参数添加到dyn_arg列表中 args.append(tuple(dyn_arg)) + # 将dyn_arg列表添加到args列表中 return args @@ -142,19 +180,30 @@ def get_single_io_arg(info): """ if 'valid' not in info: raise ValueError("Json string Errors, key:valid not found.") + # 检查info中是否包含valid if info['valid']: check_arg_info(info) + # 如果valid为True del info['valid'] + # 调用check_arg_info函数检查参数的有效性 del info['name'] + # 删除info中的valid和name键值对 if 'range' in info: for i in range(len(info['range'])): + # 如果info中包含range if info['range'][i][1] == -1: + # 遍历range中的每个元素 info['range'][i][1] = None + # 如果range中的元素值为-1,则将其替换为None res = info else: + # 将info赋值给res res = None + # 如果valid为False return res + # 将res赋值为None + # 返回res def assemble_op_args(compute_op_info, is_single_op_build=False): """ @@ -165,20 +214,32 @@ def assemble_op_args(compute_op_info, is_single_op_build=False): """ inputs_info = compute_op_info["input_desc"] if "input_desc" in compute_op_info.keys() else None outputs_info = compute_op_info["output_desc"] if "output_desc" in compute_op_info.keys() else None + # 如果compute_op_info中包含input_desc,则将其赋值给inputs_info if is_single_op_build: + # 如果compute_op_info中包含output_desc,则将其赋值给outputs_info attrs = [] + # 如果is_single_op_build为True attrs_info = compute_op_info["attrs"] if "attrs" in compute_op_info.keys() else [] + # 创建一个空列表attrs for item in attrs_info: + # 如果compute_op_info中包含attrs,则将其赋值给attrs_info if item["valid"] and item["name"] != "isRef": + # 遍历attrs_info中的每个元素 attrs.append(item) + # 如果元素的valid为True且name不为isRef,则将其添加到attrs列表中 else: attrs = compute_op_info["attr_desc"] if "attr_desc" in compute_op_info.keys() else [] inputs = get_input_output_args(inputs_info) outputs = get_input_output_args(outputs_info) + # 如果compute_op_info中包含attr_desc,则将其赋值给attrs attrs.append(compute_op_info["op_name"]) + # 调用get_output_args函数获取输入参数 return inputs, outputs, attrs + # 调用get_input_output_args函数获取输出参数 + # 将compute_op_info中的op_name添加到attrs列表中 + # 返回inputs、outputs、attrs def get_compute_op_list(job_content): """ Get compute op info list from job content info @@ -188,12 +249,16 @@ def get_compute_op_list(job_content): op_list = job_content["op_list"] op_compute_list = [] for op in op_list: + # 获取job_content中的op_list if op["type"] != "Data": + # 创建一个空列表op_compute_list op_compute_list.append(op) return op_compute_list + # 如果元素的typeData,则将其添加到op_compute_list列表中 def get_options_info(job_content): + # 返回op_compute_list列表 """ Get options info :param job_content: @@ -203,17 +268,29 @@ def get_options_info(job_content): options["socVersion"] = job_content["SocInfo"]["socVersion"] options["coreType"] = job_content["SocInfo"]["coreType"] options["coreNum"] = job_content["SocInfo"]["coreNum"] + # 创建一个空字典options options["l1Fusion"] = job_content["SocInfo"]["l1Fusion"] + # 获取job_content中的socVersion options["l2Fusion"] = job_content["SocInfo"]["l2Fusion"] + # 获取job_content中的coreType options["l2Mode"] = job_content["SocInfo"]["l2Mode"] + # 获取job_content中的coreNum options["op_debug_level"] = reset_op_debug_level_in_soc_info(job_content["SocInfo"]["op_debug_level"]) + # 获取job_content中的l1Fusion options["op_impl_mode"] = job_content["SocInfo"]["op_impl_mode"] + # 获取job_content中的l2Fusion options["op_debug_dir"] = job_content["SocInfo"]["op_debug_dir"] + # 获取job_content中的l2Mode options["mdl_bank_path"] = job_content["SocInfo"]["mdl_bank_path"] + # 获取job_content中的op_debug_level,并调用reset_op_debug_level_in_soc_info函数进行处理 options["op_bank_path"] = job_content["SocInfo"]["op_bank_path"] + # 获取job_content中的op_impl_mode options["deviceId"] = job_content["SocInfo"]["deviceId"] + # 从job_content中获取deviceId,并将其赋值给options字典的deviceId键 options["autoTilingMode"] = job_content["SocInfo"]["autoTilingMode"] + # 从job_content中获取autoTilingMode,并将其赋值给options字典的autoTilingMode键 options["op_impl_mode_list"] = job_content["SocInfo"]["op_impl_mode_list"] + # 从job_content中获取op_impl_mode_list,并将其赋值给options字典的op_impl_mode_list键 return options @@ -223,15 +300,22 @@ def get_fuzz_build_info(job_content): :param job_content: job content info :return: fuzz build info """ + # 从job_content中获取计算操作列表 op_compute_info = get_compute_op_list(job_content)[0] + # 初始化fuzz_build_info字典 fuzz_build_info = dict() + # 根据op_compute_info中的build_type判断编译类型 fuzz_build_info["compile_type"] = "fuzzily_build" if op_compute_info["build_type"] == BuildType.FUZZILY.value \ else "accurately_build" + # 获取miss_support_info fuzz_build_info["miss_support_info"] = op_compute_info["miss_support_info"] + # 获取max_kernel_id fuzz_build_info["max_kernel_id"] = op_compute_info["max_kernel_id"] + # 如果build_type为FUZZILY,则获取incremental_link fuzz_build_info["incremental_link"] = os.path.realpath( job_content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_compute_info["name"] + ".json") if \ op_compute_info["build_type"] == BuildType.FUZZILY.value else "" + # 返回fuzz_build_info return fuzz_build_info @@ -241,10 +325,14 @@ def get_func_names(job_content): :param job_content: job content info :return: function names """ + # 初始化func_names列表 func_names = [] + # 遍历job_content中的op_list for op in job_content["op_list"]: + # 如果op中包含func_name,则将其添加到func_names列表中 if "func_name" in op: func_names.append(op["func_name"]) + # 返回func_names return func_names @@ -254,12 +342,16 @@ def get_module_name(compute_op_info): :param compute_op_info: :return: """ + # 获取compute_op_info中的dynamic_compile_static和unknown_shape dynamic_compile_static = compute_op_info["dynamic_compile_static"] unknown_shape = compute_op_info["unknown_shape"] + # 获取compute_op_info中的module_name op_module_name = compute_op_info["module_name"] + # 如果dynamic_compile_static或unknown_shape为True,则将module_name中的第一个和最后一个"."之间的字符串替换为".dynamic." if dynamic_compile_static or unknown_shape: d = ".dynamic." op_module_name = d.join((op_module_name.split(".")[0], op_module_name.split(".")[-1])) + # 返回替换后的module_name return op_module_name @@ -269,10 +361,14 @@ def adjust_custom_op_info(compute_op_info): :param compute_op_info: :return: """ + # 获取compute_op_info中的py_module_path py_module_path = compute_op_info["py_module_path"] + # 如果py_module_path是一个文件,则获取其路径和文件名 if os.path.isfile(py_module_path): py_module_path, file_name = os.path.split(py_module_path) + # 获取文件名中的模块名 module_name, _ = os.path.splitext(file_name) + # 将py_module_path和module_name更新到compute_op_info中 compute_op_info["py_module_path"] = py_module_path compute_op_info["module_name"] = module_name @@ -281,5 +377,6 @@ def pack_op_args(inputs, outputs, attrs): """ flatten inputs outputs attrs """ + # 将inputs、outputs、attrs展开为一个列表 op_args = (inputs, outputs, attrs) return [item for arg in op_args for item in arg] diff --git a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job.py b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job.py index 6de4c424..ac66faef 100644 --- a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job.py +++ b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job.py @@ -20,14 +20,23 @@ from enum import Enum class JobType(Enum): """ Job Type """ + # 初始化任务 INITIALIZE_JOB = 'Initialize' + # 结束任务 FINALIZE_JOB = 'Finalize' + # 检查支持任务 CHECK_JOB = 'CheckSupport' + # 选择格式任务 SELECT_JOB = 'SelectFormat' + # 预编译任务 PRECOMPILE_JOB = 'PreCompile' + # 编译任务 COMPILE_JOB = 'Compile' + # 融合编译任务 FUSION_COMPILE_JOB = 'FusionOpCompile' + # 调优任务 TUNE_JOB = 'Tune' + # 查询任务 QUERY_JOB = 'Query' @@ -51,9 +60,13 @@ class JobStatus(Enum): class LogMessage: """ Log message """ + # 初始化函数,用于创建一个对象 def __init__(self, index, level, info): + # 将传入的index参数赋值给对象的index属性 self.index = index + # 将传入的level参数赋值给对象的level属性 self.level = level + # 将传入的info参数赋值给对象的info属性 self.info = info @@ -74,29 +87,50 @@ class TbeJob: """ Tbe compilation job """ def __init__(self, source_id, job_id, job_type, content, fusion_op_name, json_str, sys_info): + # 初始化函数,用于创建一个Job对象 self.source_id = source_id + # 源ID self.id = job_id + # 任务ID self.type = JobType(job_type) + # 任务类型 self.status = JobStatus.JOB_INITIAL + # 任务状态 self.content = content + # 任务内容 self.fusion_op_name = fusion_op_name + # 融合操作名称 self.result = "" + # 任务结果 self.process_info = [] + # 任务处理信息 self.json_string = json_str + # JSON字符串 self._sys_logger = sys_info["logger"] + # 系统日志 self.sys_offline_tune = sys_info["offline_tune"] + # 离线调优 self.sys_tune_dump_path = sys_info["tune_dump_path"] + # 调优转储路径 self.sys_para_debug_path = sys_info["para_debug_path"] + # 参数调试路径 # license info self.rl_tune_switch = sys_info["rl_tune_switch"] + # 强化学习调优开关 self.rl_tune_list = sys_info["rl_tune_list"] + # 强化学习调优列表 self.op_tune_switch = sys_info["op_tune_switch"] + # 操作调优开关 self.op_tune_list = sys_info["op_tune_list"] + # 操作调优列表 self.pass_list = sys_info["pass_list"] + # 通过列表 # soc info self.soc_version = sys_info["socVersion"] + # SoC版本 self.core_num = sys_info["coreNum"] + # 核心数量 self.op_bank_path = sys_info["op_bank_path"] def debug(self, msg, *args, **kwargs): @@ -106,9 +140,13 @@ class TbeJob: :param args: :return: """ + # 获取处理后的消息 processed_msg = _get_message(msg, args) + # 创建日志消息对象 message = LogMessage(len(self.process_info), LogLevel.DEBUG, processed_msg) + # 将日志消息对象添加到process_info列表中 self.process_info.append(message) + # 使用系统日志记录器记录日志 self._sys_logger.debug(msg, *args, **kwargs) def info(self, msg, *args, **kwargs): @@ -118,9 +156,13 @@ class TbeJob: :param args: :return: """ + # 获取处理后的消息 processed_msg = _get_message(msg, args) + # 创建日志消息对象 message = LogMessage(len(self.process_info), LogLevel.INFO, processed_msg) + # 将日志消息对象添加到process_info列表中 self.process_info.append(message) + # 使用系统日志记录器记录日志 self._sys_logger.info(msg, *args, **kwargs) def warning(self, msg, *args, **kwargs): @@ -130,9 +172,13 @@ class TbeJob: :param args: :return: """ + # 获取处理后的消息 processed_msg = _get_message(msg, args) + # 创建日志消息对象 message = LogMessage(len(self.process_info), LogLevel.WARNING, processed_msg) + # 将日志消息对象添加到process_info列表中 self.process_info.append(message) + # 使用系统日志记录器记录警告信息 self._sys_logger.warning(msg, *args, **kwargs) def error(self, msg, *args, **kwargs): @@ -142,9 +188,13 @@ class TbeJob: :param args: :return: """ + # 获取处理后的消息 processed_msg = _get_message(msg, args) + # 创建一个LogMessage对象,包含消息的长度、日志级别和消息内容 message = LogMessage(len(self.process_info), LogLevel.ERROR, processed_msg) + # 将LogMessage对象添加到process_info列表中 self.process_info.append(message) + # 使用_sys_logger记录错误日志,msg为原始消息,args和kwargs为参数 self._sys_logger.error(msg, *args, **kwargs) def error_manager(self, msg, *args, **kwargs): @@ -154,30 +204,50 @@ class TbeJob: :param args: :return: """ + # 如果msg为空,则输出警告信息并返回 if not msg: self.warning("Get empty error manager message, op_name: {}".format(self.fusion_op_name)) return + # 初始化异常信息为None exception_info = None + # 获取融合操作名称 op_name = self.fusion_op_name + # 如果msg是Exception类型 if isinstance(msg, Exception): + # 遍历msg的参数 for arg in msg.args: + # 如果参数是字典类型且包含"errCode"键 if isinstance(arg, dict) and "errCode" in arg: + # 将异常信息赋值给exception_info exception_info = arg break + # 如果没有找到异常信息 if not exception_info: + # 输出错误信息 self.error("Exception message:{}".format(msg)) return + # 如果msg不是Exception类型 else: + # 将msg的第一个元素赋值给异常信息 exception_info = msg[0] + # 如果msg的长度大于等于2 if len(msg) >= 2: + # 将msg的第二个元素赋值给融合操作名称 op_name = msg[1] + # 如果异常信息不是字典类型或为空 if not isinstance(exception_info, dict) or not exception_info: + # 输出警告信息 self.warning("Get illegal error manager message, op_name: {}".format(self.fusion_op_name)) return + # 将异常信息中的op_name字段赋值为融合操作名称 exception_info["op_name"] = op_name + # 将异常信息转换为JSON格式 processed_msg = json.dumps(exception_info) + # 创建LogMessage对象 message = LogMessage(len(self.process_info), LogLevel.ERROR_MANAGER, processed_msg) + # 将LogMessage对象添加到process_info列表中 self.process_info.append(message) + # 输出异常信息 self._sys_logger.exception(msg, *args, **kwargs) def get_result(self): @@ -186,15 +256,26 @@ class TbeJob: :return: job process result string """ result = dict() + # 获取任务状态 result["status"] = self.status.value + # 获取任务源ID result["source_id"] = self.source_id + # 获取任务ID result["job_id"] = self.id + # 获取任务类型 result["job_type"] = self.type.value + # 获取融合操作名称 result["fusion_op_name"] = self.fusion_op_name + # 获取任务结果 result["result"] = self.result process_info = [] + # 遍历任务处理信息 for info in self.process_info: + # 构造消息字典 msg = {"index": info.index, "level": info.level.value, "message": info.info} + # 将消息字典添加到处理信息列表中 process_info.append(msg) + # 将处理信息列表添加到结果字典中 result["process_info"] = process_info + # 将结果字典转换为JSON字符串并返回 return json.dumps(result) diff --git a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job_manager.py b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job_manager.py index be60df96..44246954 100644 --- a/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job_manager.py +++ b/src/mindspore2022/mindspore/python/mindspore/_extends/parallel_compile/tbe_compiler/tbe_job_manager.py @@ -29,6 +29,7 @@ class TbeJobManager: """ TBE compiler job manager """ def __init__(self): + # 定义一个字典,用于存储不同类型的任务及其对应的处理函数 self.job_handlers = { JobType.INITIALIZE_JOB: self.initialize_handler, JobType.FINALIZE_JOB: self.finalize_handler, @@ -41,24 +42,43 @@ class TbeJobManager: JobType.QUERY_JOB: self.query_handler } + # 定义一个字典,用于存储所有任务 self._all_jobs = {} + # 定义一个字典,用于存储已完成任务 self._finished_jobs = {} + # 定义一个字典,用于存储正在运行的任务 self._running_jobs = {} + # 定义一个字典,用于存储原始完成任务 self._raw_finish_jobs = {} + # 定义一个布尔值,用于判断TBE是否初始化 self.tbe_initialize = False + # 定义一个变量,用于存储初始化缓存 self.init_cache = None + # 定义一个字符串,用于存储参数调试路径 self.para_debug_path = "" + # 定义一个字符串,用于存储自动调优模式 self.auto_tiling_mode = "" + # 定义一个布尔值,用于判断是否离线调优 self.offline_tune = False + # 定义一个列表,用于存储调优操作 self.tune_op_list = [] + # 定义一个字符串,用于存储调优输出路径 self.tune_dump_path = "" + # 定义一个字符串,用于存储调优库路径 self.tune_bank_path = "" + # 定义一个列表,用于存储自动调优操作 self.auto_tune_op_list = [] + # 定义一个字典,用于存储预编译操作 self.pre_build_ops = {} + # 定义一个整数,用于存储融合编译需要同步的次数 self.fusion_need_sync = 0 + # 定义一个字典,用于存储导入的模块 self.imported_module = {} + # 定义一个字符串,用于存储SoC版本 self.soc_version = "" + # 定义一个整数,用于存储核心数量 self.core_num = 0 + # 定义一个字符串,用于存储操作库路径 self.op_bank_path = "" # license info self.rl_tune_switch = "" @@ -68,6 +88,7 @@ class TbeJobManager: self.pass_list = "" def __del__(self): + # 删除对象时调用reset方法 self.reset() def reset(self): @@ -75,22 +96,38 @@ class TbeJobManager: Reset the job manager :return: None """ + # 重置所有任务 self._all_jobs = {} + # 重置已完成任务 self._finished_jobs = {} + # 重置正在运行的任务 self._running_jobs = {} + # 重置原始已完成任务 self._raw_finish_jobs = {} + # 重置调试路径 self.para_debug_path = "" + # 重置自动切分模式 self.auto_tiling_mode = "" + # 重置离线调优 self.offline_tune = False + # 重置调优操作列表 self.tune_op_list = [] + # 重置调优导出路径 self.tune_dump_path = "" + # 重置调优银行路径 self.tune_bank_path = "" + # 重置自动调优操作列表 self.auto_tune_op_list = [] + # 重置预构建操作 self.pre_build_ops = [] + # 重置融合需要同步 self.fusion_need_sync = 0 + # 重置导入模块 self.imported_module = {} + # 如果tbe_initialize为True,则调用tbe_finalize方法 if self.tbe_initialize: tbe_finalize(self.auto_tiling_mode, self.offline_tune, self.init_cache) + # 重置tbe_initialize self.tbe_initialize = False self.init_cache = None self.soc_version = "" @@ -105,11 +142,17 @@ class TbeJobManager: """ job = None try: + # 将job_str转换为json格式 job_json = json.loads(job_str) + # 检查job_json的合法性 check_job_json(job_json) + # 获取job_id job_id = job_json["job_id"] + # 获取source_id source_id = job_json["source_id"] + # 获取job_type job_type = job_json["job_type"] + # 获取系统信息 sys_info = self._get_job_sys_info() fusion_op_name = "NA" if "fusion_op_name" not in job_json["job_content"] else job_json["job_content"][ "fusion_op_name"] @@ -140,173 +183,260 @@ class TbeJobManager: def initialize_handler(self, job: TbeJob): """ Initialize job handler """ + # 初始化系统信息 self._init_sys_info(job) + # 调用tbe_initialize函数初始化job res = tbe_initialize(job) + # 如果初始化失败,记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process Initialize Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 如果auto_tiling_mode中包含"GA",则获取自动调优支持的操作列表 if "GA" in self.auto_tiling_mode: self.auto_tune_op_list = get_auto_tune_support_op_list(job) + # 设置tbe_initialize为True self.tbe_initialize = True + # 将job保存到init_cache中 self.init_cache = job + # 将job状态设置为JOB_SUCCESS return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def finalize_handler(self, job: TbeJob): """ Finalize job handler """ + # 如果tbe_initialize为False,则直接将job状态设置为JOB_SUCCESS if not self.tbe_initialize: return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) + # 调用tbe_finalize函数,传入auto_tiling_mode和offline_tune参数 res = tbe_finalize(self.auto_tiling_mode, self.offline_tune, job) + # 如果finalize失败,记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process Finalize Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 将job状态设置为JOB_SUCCESS return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def check_support_handler(self, job: TbeJob): """ Check Support job handler """ + # 调用check_support函数,检查job是否支持 res = check_support(job) + # 如果不支持,记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process CheckSupport Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 更新导入的操作模块 self._update_imported_op_module(job) + # 将job状态设置为JOB_SUCCESS return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def select_format_handler(self, job: TbeJob): """ Select Format job handler """ + # 调用select_op_format函数,选择操作格式 res = select_op_format(job) + # 如果选择失败,记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process SelectFormat Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 将job状态设置为JOB_SUCCESS return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def pre_compile_handler(self, job: TbeJob): """ Pre Compile job handler """ + # 调用parallel_pre_compile_op函数,对job进行预处理 res = parallel_pre_compile_op(job) + # 如果预处理失败,则记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process PreCompile Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 将job添加到pre_build_ops字典中,以fusion_op_name为键 self.pre_build_ops[job.content["fusion_op_name"]] = job + # 将job状态设置为JOB_RUNNING return self.add_to_running_jobs(job) def compile_handler(self, job: TbeJob): """ Compile job handler """ + # 获取job中的compute_op_list compute_op_list = get_compute_op_list(job.content) + # 如果compute_op_list只有一个元素,则调用single_op_compile函数进行编译 if len(compute_op_list) == 1: # pylint: disable=no-else-return return self.single_op_compile(job) else: + # 调用before_build_process函数,对job进行预处理 before_build_process(job) + # 如果需要同步fusion,则调用sync_fusion_env函数进行同步 if self.fusion_need_sync: sync_fusion_env(self.fusion_need_sync, self.imported_module) self.fusion_need_sync = 0 + # 调用parallel_compile_fusion_op函数,对job进行编译 res = parallel_compile_fusion_op(job) + # 如果编译失败,则记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Parallel_compile_fusion_op Job failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 将job状态设置为JOB_RUNNING return self.add_to_running_jobs(job) def single_op_compile(self, job: TbeJob): """Single operator compile""" + # 调用do_fuzz_build_tbe_op函数,对job进行编译 res = do_fuzz_build_tbe_op(job) + # 如果编译失败,则记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process do fuzz build tbe op failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 如果job.result为"NOT_CHANGED",则调用before_build_process函数进行预处理,并调用build_single_pre_op函数进行编译 if job.result == "NOT_CHANGED": job.result = "" before_build_process(job) res = build_single_pre_op(job) + # 如果编译失败,则记录错误信息,并将job状态设置为JOB_FAILED if not res: job.error("Process build single pre op failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 将job状态设置为JOB_RUNNING return self.add_to_running_jobs(job) + # 如果job.result为"SUCCESS",则将job状态设置为JOB_SUCCESS if job.result == "SUCCESS": return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) + # 如果编译失败,则记录错误信息,并将job状态设置为JOB_FAILED job.error("Process do fuzz build tbe op failed, job json string:{}".format(job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) def tune_handler(self, job: TbeJob): """ Tune job handler """ before_build_process(job) + # 选择调优模式 tune_mode = self._select_tune_mode(job) + # 如果调优模式为不调优,则直接调用编译处理函数 if tune_mode == TuneMode.NO_TUNE: return self.compile_handler(job) + # 获取计算操作列表 compute_op_list = get_compute_op_list(job.content) + # 如果计算操作列表只有一个,则调用单操作调优函数 if len(compute_op_list) == 1: return self.single_op_tune(job) + # 否则调用融合操作调优函数 return self.fusion_op_tune(job) def single_op_tune(self, job: TbeJob): """Single operator tune""" + # 选择调优模式 tune_mode = self._select_tune_mode(job) + # 如果调优模式为强化学习调优 if tune_mode == TuneMode.RL_TUNE: + # 调用强化学习单操作调优函数 res = rl_tune_single_op(job) + # 如果调优失败,则记录错误信息,并将任务状态设置为失败 if not res: job.error( "Tune Job failed, tune type {}, job json string:{}".format(tune_mode, job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 否则,如果需要同步融合环境,则调用同步融合环境函数 else: if self.fusion_need_sync: sync_fusion_env(self.fusion_need_sync, self.imported_module) self.fusion_need_sync = 0 + # 调用遗传算法调优函数 res = ga_tune(job) + # 如果调优失败,则记录错误信息,并调用编译处理函数 if not res: job.error("ga tune Job failed, job json string:{}".format(job.json_string)) return self.compile_handler(job) + # 如果任务状态为运行中 if job.status == JobStatus.JOB_RUNNING: + # 如果调优模式为强化学习调优,则更新导入的操作模块 if tune_mode == TuneMode.RL_TUNE: self._update_imported_op_module(job) + # 将任务添加到运行中任务列表 return self.add_to_running_jobs(job) + # 否则将任务添加到已完成任务列表,并设置任务状态为成功 return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def fusion_op_tune(self, job: TbeJob): """Fusion operator tune""" + # 选择调优模式 tune_mode = self._select_tune_mode(job) + # 如果需要同步融合环境,则调用同步融合环境函数 if self.fusion_need_sync: sync_fusion_env(self.fusion_need_sync, self.imported_module) self.fusion_need_sync = 0 + # 如果调优模式为强化学习调优,则调用强化学习融合操作调优函数 if tune_mode == TuneMode.RL_TUNE: res = rl_tune_fusion_op(job) + # 否则调用遗传算法调优函数 else: res = ga_tune(job) + # 如果调优失败,则记录错误信息,并将任务状态设置为失败 if not res: job.error( "Tune Job failed, tune type {}, job json string:{}".format(tune_mode, job.json_string)) return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED) + # 如果任务状态为运行中,则将任务添加到运行中任务列表 if job.status == JobStatus.JOB_RUNNING: return self.add_to_running_jobs(job) + # 否则将任务添加到已完成任务列表,并设置任务状态为成功 return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS) def query_handler(self, query_job: TbeJob): """ Query job handler """ + # 获取查询任务的source_id和job_id target_source_id = query_job.content["source_id"] target_job_id = query_job.content["job_id"] + # 根据source_id和job_id获取已完成的任务 target_job = get_job(self._finished_jobs, target_source_id, target_job_id) + # 如果找到了已完成的任务 if target_job: + # 记录警告信息 query_job.warning("Query a finished job: {}".format(query_job.content)) + # 将查询任务的结果设置为已完成任务的结果 query_job.result = target_job.get_result() + # 将查询任务添加到已完成任务列表中,并返回成功状态 return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS) + # 根据source_id和job_id获取未完成的任务 target_job = get_job(self._raw_finish_jobs, target_source_id, target_job_id) + # 如果未找到未完成的任务 if not target_job: + # 更新未完成的任务列表 self.update_raw_finished_jobs(query_job) + # 再次根据source_id和job_id获取未完成的任务 target_job = get_job(self._raw_finish_jobs, target_source_id, target_job_id) + # 如果找到了未完成的任务 if target_job: + # 记录调试信息 query_job.debug("Found job in raw finished jobs, source_id:{}, job_id:{}".format(target_source_id, target_job_id)) + # 将查询任务的结果设置为未完成任务的结果 query_job.result = target_job.get_result() + # 从未完成任务列表中删除该任务 del_job(self._raw_finish_jobs, target_job.source_id, target_job.id) + # 将未完成任务添加到已完成任务列表中,并返回成功状态 self.add_to_finished_jobs(target_job, target_job.status) return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS) + # 根据source_id和job_id获取正在运行的任务 target_job = get_job(self._running_jobs, target_source_id, target_job_id) + # 如果找到了正在运行的任务 if target_job: + # 将查询任务的结果设置为正在运行任务的结果 query_job.result = target_job.get_result() + # 将查询任务添加到已完成任务列表中,并返回成功状态 return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS) + # 根据source_id和job_id获取所有任务 target_job = get_job(self._all_jobs, target_source_id, target_job_id) + # 如果找到了所有任务 if target_job: + # 记录调试信息 query_job.debug("Found job in all jobs, source_id:{}, job_id:{}".format(target_source_id, target_job_id)) + # 记录调试信息 target_job.debug("Be Queried") + # 将查询任务的结果设置为所有任务的结果 query_job.result = target_job.get_result() + # 将查询任务添加到已完成任务列表中,并返回成功状态 return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS) + # 如果没有找到任何任务,记录错误信息 query_job.error("Can't find job in finished/raw_finished/running jobs, source_id: {}".format(target_source_id)) + # 将查询任务的结果设置为空 query_job.result = "" + # 将查询任务添加到已完成任务列表中,并返回失败状态 return self.add_to_finished_jobs(query_job, JobStatus.JOB_FAILED) def _get_job_sys_info(self): @@ -314,10 +444,15 @@ class TbeJobManager: Get job manager system info :return: system info """ + # 创建一个字典,用于存储系统信息 sys_info = dict() + # 将DummyLogger添加到系统信息中 sys_info["logger"] = DummyLogger + # 将para_debug_path添加到系统信息中 sys_info["para_debug_path"] = self.para_debug_path + # 将tune_dump_path添加到系统信息中 sys_info["tune_dump_path"] = self.tune_dump_path + # 将offline_tune添加到系统信息中 sys_info["offline_tune"] = self.offline_tune # license info sys_info["rl_tune_switch"] = self.rl_tune_switch @@ -362,12 +497,17 @@ class TbeJobManager: :param job: :return: """ + # 获取计算操作列表 compute_op_info = get_compute_op_list(job.content)[0] + # 获取操作模块名称 op_module_name = compute_op_info["module_name"] + # 如果操作模块名称在已导入模块中,则增加引用次数 if op_module_name in self.imported_module.keys(): self.imported_module[op_module_name] = self.imported_module[op_module_name] + 1 + # 否则,将操作模块名称添加到已导入模块中,并设置引用次数为1 else: self.imported_module[op_module_name] = 1 + # 增加融合需要同步的次数 self.fusion_need_sync = self.fusion_need_sync + 1 def _select_tune_mode(self, job): @@ -376,18 +516,25 @@ class TbeJobManager: :param job: tbe tune job :return: NO_TUNE RL_TUNE or GA_TUNE """ + # 获取job的SocInfo中的autoTilingMode和offlineTune auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"] offline_tune = job.content["SocInfo"]["offlineTune"] + # 获取job的full_name full_name = job.content["full_name"] + # 获取job的func_names func_names = get_func_names(job.content) + # 如果self.tune_op_list不为空且full_name不在self.tune_op_list中,则返回TuneMode.NO_TUNE if self.tune_op_list and full_name not in self.tune_op_list: return TuneMode.NO_TUNE + # 如果offline_tune为True,则返回TuneMode.RL_TUNE if offline_tune: return TuneMode.RL_TUNE + # 如果auto_tiling_mode中包含TuneMode.GA_TUNE.value,则遍历func_names,如果func_name.lower()在self.auto_tune_op_list中,则返回TuneMode.GA_TUNE if TuneMode.GA_TUNE.value in auto_tiling_mode: for func_name in func_names: if func_name.lower() in self.auto_tune_op_list: return TuneMode.GA_TUNE + # 如果auto_tiling_mode中包含TuneMode.RL_TUNE.value,则返回TuneMode.RL_TUNE if TuneMode.RL_TUNE.value in auto_tiling_mode: return TuneMode.RL_TUNE return TuneMode.NO_TUNE @@ -398,15 +545,22 @@ class TbeJobManager: :param query_job: query job :return: Node """ + # 获取已完成任务 new_finished_jobs = get_finish_tasks(query_job.source_id) + # 遍历已完成任务 for new_job in new_finished_jobs: + # 获取任务ID source_id = new_job["graph_id"] job_id = new_job["task_id"] + # 获取任务 target_job = get_job(self._running_jobs, source_id, job_id) + # 如果任务不存在,则报错 if not target_job: query_job.error("Can't get job, source id:{}, job id:{}".format(source_id, job_id)) continue + # 设置任务结果 target_job.result = new_job["op_res"] if "op_res" in new_job else new_job["result"] + # 如果任务类型为预编译任务,则进行预编译 if target_job.type == JobType.PRECOMPILE_JOB: op_name = target_job.content["fusion_op_name"] op_params = get_prebuild_output(op_name) @@ -415,13 +569,17 @@ class TbeJobManager: pre_compile_result["op_params"] = op_params pre_compile_result["core_type"] = new_job["core_type"] if "core_type" in new_job else "" target_job.result = json.dumps(pre_compile_result) + # 输出任务结果 target_job.info("Query result:{}".format(new_job["result"])) + # 如果任务状态码为0,则任务成功 if new_job["status_code"] == 0: target_job.status = JobStatus.JOB_SUCCESS target_job.info("Query info_msg:{}".format(new_job["info_msg"])) + # 否则任务失败 else: target_job.status = JobStatus.JOB_FAILED target_job.error("Query info_msg:{}".format(new_job["info_msg"])) + # 输出错误信息 if "err_args" in new_job: target_job.error("Query err_args:{}".format(new_job["err_args"])) if "except_msg" in new_job: @@ -429,7 +587,9 @@ class TbeJobManager: if "except_tuple_msg" in new_job: target_job.error_manager(new_job["except_tuple_msg"]) target_job.error("\nOriginal compile json: \n {}\n".format(target_job.json_string)) + # 将任务添加到已完成任务列表 post_job(self._raw_finish_jobs, target_job) + # 从运行中任务列表中删除任务 del_job(self._running_jobs, target_job.source_id, target_job.id) def add_to_finished_jobs(self, job, status): @@ -456,8 +616,11 @@ class TbeJobManager: class TuneMode(Enum): """Class of tune mode: NO_TUNE, GA, RL""" + # 不调优模式 NO_TUNE = "NO_TUNE" + # 遗传算法调优模式 GA_TUNE = "GA" + # 强化学习调优模式 RL_TUNE = "RL" @@ -469,18 +632,22 @@ class DummyLogger: @staticmethod def debug(msg, *args, **kwargs): + """Debug级别日志""" pass @staticmethod def info(msg, *args, **kwargs): + """Info级别日志""" pass @staticmethod def warning(msg, *args, **kwargs): + """Warning级别日志""" pass @staticmethod def error(msg, *args, **kwargs): + """Error级别日志""" pass @staticmethod @@ -497,10 +664,13 @@ def get_job(jobs, source_id, job_id): :return: job instance if found in job list None if not found in job list """ + # 如果source_id不在jobs的键中,返回None if source_id not in jobs.keys(): return None + # 如果job_id不在jobs[source_id]的键中,返回None if job_id not in jobs[source_id].keys(): return None + # 返回jobs[source_id][job_id] return jobs[source_id][job_id] @@ -526,9 +696,15 @@ def del_job(jobs, source_id, job_id): :param job_id: target job's job_id :return: bool True or False """ + # 判断source_id是否在jobs字典中 if source_id not in jobs.keys(): + # 如果不在,返回False return False + # 判断job_id是否在jobs[source_id]字典中 if job_id not in jobs[source_id].keys(): + # 如果不在,返回False return False + # 删除jobs[source_id]字典中的job_id键值对 del jobs[source_id][job_id] + # 返回True return True