_extends\parallel_compile\tbe_compiler

branch-yixin
yixin 7 months ago
parent 0012f23abf
commit 569be2fa4e

@ -13,6 +13,7 @@
# limitations under the License.
# ============================================================================
"""tbe adapter to adapt te/topi/auto-tune python api """
# 导入必要的库和模块
import json
import os
import shutil
@ -20,33 +21,62 @@ import sys
import traceback
from datetime import datetime
# 导入TBE相关的库和模块
from tbe.common.rl_bank.bank_manager import set_current_op_name
from tbe.common.repository_manager.interface import cann_kb_unload, cann_kb_load
from tbe.common.rl_bank.bank_cfg import LocalLock
from te.platform.cce_conf import te_set_version
from te.platform.cce_policy import set_L1_info
from te_fusion.compile_task_manager import dispatch_prebuild_task, dispatch_single_op_compile_task, import_py_module, \
dispatch_fusion_op_compile_task, dispatch_autotune_task, sync_op_tune_params
from te_fusion.compile_task_manager import sync_syspath
from te_fusion.fusion_manager import call_op_func, clear_fusion_params, check_op_impl_mode, \
save_op_params, build_single_op_from_c, op_params_to_json
from te_fusion.compile_task_manager import (
dispatch_prebuild_task,
dispatch_single_op_compile_task,
import_py_module,
dispatch_fusion_op_compile_task,
dispatch_autotune_task,
sync_op_tune_params,
sync_syspath
)
from te_fusion.fusion_manager import (
call_op_func,
clear_fusion_params,
check_op_impl_mode,
save_op_params,
build_single_op_from_c,
op_params_to_json
)
from te_fusion.fusion_util import dump_fusion_json
from te_fusion.parallel_compilation import init_multi_process_env, start_ga_multi_process, deinit_multi_process_env, \
from te_fusion.parallel_compilation import (
init_multi_process_env,
start_ga_multi_process,
deinit_multi_process_env,
get_finished_compilation_task
from .tbe_helper import get_soc_info, assemble_op_args, get_compute_op_list, get_options_info, get_fuzz_build_info, \
adjust_custom_op_info, pack_op_args, get_module_name, get_real_op_debug_level
)
from .tbe_helper import (
get_soc_info,
assemble_op_args,
get_compute_op_list,
get_options_info,
get_fuzz_build_info,
adjust_custom_op_info,
pack_op_args,
get_module_name,
get_real_op_debug_level
)
from .tbe_job import TbeJob, JobStatus
PLATFORM_FLAG = ["Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403"]
# 定义支持的平台标志
PLATFORM_FLAG = [
"Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403"
]
# 定义Tune初始化函数
def _tune_init(job: TbeJob):
"""
Tune Initialize
:param job:
:return:
Tune初始化
:param job: TbeJob对象包含任务信息
:return: 初始化是否成功
"""
# 提取Soc信息和Tune信息
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
offline_tune = job.content["SocInfo"]["offlineTune"]
op_bank_update = job.content["SocInfo"]["op_bank_update"]
@ -54,11 +84,14 @@ def _tune_init(job: TbeJob):
tune_bank_path = job.content["TuneInfo"]["tune_bank_path"]
need_ga = bool("GA" in auto_tiling_mode)
need_rl = bool("RL" in auto_tiling_mode)
# 设置环境变量
if offline_tune:
os.environ["ENABLE_TUNE_DUMP"] = "TRUE"
if op_bank_update:
sync_op_tune_params("tbe.common.tiling.tiling_api", "reset_repository", False, "")
# 初始化Tune环境
if need_ga or need_rl or offline_tune:
res = __init_tune_env(job, need_ga)
if not res:
@ -66,6 +99,7 @@ def _tune_init(job: TbeJob):
else:
return True
# 设置Tune路径
if tune_dump_path:
os.environ["TUNE_DUMP_PATH"] = str(tune_dump_path)
if tune_bank_path:
@ -73,12 +107,12 @@ def _tune_init(job: TbeJob):
res = _creating_custom_path(job)
return res
# 定义CANN知识库加载函数
def _cann_kb_load(job: TbeJob):
"""
database load
:param job:
:return:
加载CANN知识库
:param job: TbeJob对象包含任务信息
:return: 加载是否成功
"""
soc_version = job.soc_version
core_num = job.core_num
@ -87,12 +121,12 @@ def _cann_kb_load(job: TbeJob):
res = cann_kb_load(soc_version, core_num, op_bank_path, kb_type)
return res
# 定义CANN知识库卸载函数
def _cann_kb_unload(job: TbeJob):
"""
database unload
:param job:
:return:
卸载CANN知识库
:param job: TbeJob对象包含任务信息
:return: 卸载是否成功
"""
if job is None:
return 0
@ -102,12 +136,12 @@ def _cann_kb_unload(job: TbeJob):
res = cann_kb_unload(soc_version, core_num, kb_type)
return res
# 定义移除缓存文件函数
def _remove_cache(job: TbeJob):
"""
:param job: remove cache file:[*.json, *.o, *.info, *.cce] when "op_debug_level" is "0"
op_debug_level: representation the env MS_COMPILER_OP_LEVEL
:return:
移除缓存文件
:param job: TbeJob对象包含任务信息
:return:
"""
op_debug_level = job.content["SocInfo"]["op_debug_level"]
op_debug_dir = job.content["SocInfo"]["op_debug_dir"]
@ -118,24 +152,30 @@ def _remove_cache(job: TbeJob):
real_path = os.path.join(root_path, "kernel_meta/")
shutil.rmtree(real_path)
# 定义创建目录函数
def __directory_creation(path, concat_path):
"""
Create directory
创建目录
:param path: 基础路径
:param concat_path: 需要连接的路径
:return: 创建后的完整路径
"""
path = os.path.join(path, concat_path)
if not os.path.isdir(path):
os.makedirs(path, 0o750)
return path
# 定义初始化Tune环境函数
def __init_tune_env(job, need_ga):
"""
Initialize tune env
初始化Tune环境
:param job: TbeJob对象包含任务信息
:param need_ga: 是否需要GA
:return: 初始化是否成功
"""
try:
import auto_tune.auto_tune_main as at_atm
from schedule_search.rl_online_tune import rl_tune_init # pylint: disable=unused-import
from schedule_search.rl_online_tune import rl_tune_init
if need_ga:
res = at_atm.ga_tune_init()
if not res:
@ -157,10 +197,13 @@ def __init_tune_env(job, need_ga):
finally:
pass
# 定义创建默认自定义路径函数
def __creating_default_custom_path(auto_tiling_mode, base_custom_path):
"""
Create default custom path
创建默认自定义路径
:param auto_tiling_mode: 自动平铺模式
:param base_custom_path: 基础自定义路径
:return:
"""
base_custom_path = __directory_creation(base_custom_path, "data")
tune_flag = []
@ -179,27 +222,40 @@ def __creating_default_custom_path(auto_tiling_mode, base_custom_path):
def _creating_custom_path(job):
"""
Create custom path
创建自定义路径用于存储和检索自定义算子的调优参数
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 自定义路径创建是否成功
"""
# 获取自动平铺模式
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
# 如果模式中包含"NO_TUNE",则不需要创建自定义路径
if "NO_TUNE" in auto_tiling_mode:
return True
# 获取调优参数的基础路径
base_custom_path = job.content["TuneInfo"]["tune_bank_path"]
tune_bank_flag = True
# 如果基础路径不存在则尝试从auto_tune模块获取
if not base_custom_path:
import auto_tune
base_custom_path = os.path.dirname(os.path.realpath(auto_tune.__file__))
base_custom_path = os.path.realpath(os.path.join(base_custom_path, "../../../"))
tune_bank_flag = False
# 检查基础路径是否存在
if not os.path.isdir(base_custom_path):
job.error("Check whether the tuning path [{}] exists.".format(base_custom_path))
return False
# 检查基础路径的权限
if not os.access(base_custom_path, os.R_OK | os.W_OK | os.X_OK):
job.error("Check whether the permission on the tuning path [{}] is correct.".format(base_custom_path))
return False
# 如果不需要创建调优参数库,则直接返回成功
if not tune_bank_flag:
return __creating_default_custom_path(auto_tiling_mode, base_custom_path)
return True
@ -207,22 +263,34 @@ def _creating_custom_path(job):
def _parallel_compilation_init(initialize: TbeJob):
"""
Tbe parallel compilation initialize
:param initialize:
:return:
初始化TBE并行编译环境
Args:
initialize (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 并行编译环境初始化是否成功
"""
# 设置并行编译器的环境变量
os.environ["TE_PARALLEL_COMPILER"] = str(initialize.content["process_num"])
# 获取SoC信息
soc_info = get_soc_info(initialize.content)
# 获取实际的调试级别
real_debug_level = get_real_op_debug_level(initialize.content)
# 获取自动平铺模式
auto_tiling_mode = initialize.content["SocInfo"]["autoTilingMode"]
# 获取是否需要离线调优
offline_tune = initialize.content["SocInfo"]["offlineTune"]
# 生成进程ID和时间戳的组合字符串
pid_ts = "{}_pid{}".format(datetime.now().strftime('%Y%m%d_%H%M%S%f')[:-3], os.getpid())
# 初始化多进程环境
ret = init_multi_process_env(False, soc_info, auto_tiling_mode, real_debug_level,
None, 1, pid_ts)
if ret is None:
initialize.error("Init multiprocess env failed")
return False
initialize.info("Init multiprocess env success with {} process".format(ret[0]))
# 如果需要RL或离线调优则初始化RL环境
if "RL" in auto_tiling_mode or offline_tune:
res_queue = ret[1]
live_checker = ret[2]
@ -234,6 +302,7 @@ def _parallel_compilation_init(initialize: TbeJob):
initialize.error("RL env init failed!")
return False
initialize.info("RL Tune init success.")
# 如果需要GA则启动GA多进程
if "GA" in auto_tiling_mode:
start_ga_multi_process(auto_tiling_mode)
initialize.info("GA Tune init success.")
@ -242,31 +311,44 @@ def _parallel_compilation_init(initialize: TbeJob):
def tbe_initialize(job: TbeJob):
"""
Tbe Initialize
:param job:
:return:
初始化TBE环境
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: TBE环境初始化是否成功
"""
# 设置上下文模型编译环境变量
os.environ["CONTEXT_MODELCOMPILING"] = "TRUE"
# 获取SoC信息
soc_info = get_soc_info(job.content)
# 设置版本
res = te_set_version(*soc_info)
if not res:
job.error("Set version failed")
# 初始化调优环境
res = _tune_init(job)
if not res:
job.error("Tune init failed")
# 创建锁文件
lock_file = os.path.join(job.content["SocInfo"]["op_debug_dir"], "kernel_meta", "file.lock")
local_lock = LocalLock(lock_file)
try:
# 加锁
local_lock.lock()
# 加载CANN知识库
res = _cann_kb_load(job)
if res == 1:
job.error("Cann kb load failed")
# 初始化并行编译
res = _parallel_compilation_init(job)
if not res:
job.error("Parallel compilation failed")
except RuntimeError:
job.error("Initialize failed with RuntimeError")
finally:
# 解锁
local_lock.unlock()
job.result = "Success"
return res
@ -274,9 +356,13 @@ def tbe_initialize(job: TbeJob):
def get_auto_tune_support_op_list(job: TbeJob):
"""
Get GA tune supported op list
:param job:
:return:
获取支持自动调优的算子列表
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
list: 支持自动调优的算子列表
"""
from auto_tune_main import enable_auto_tune_support
auto_tune_op_list = enable_auto_tune_support()
@ -286,10 +372,14 @@ def get_auto_tune_support_op_list(job: TbeJob):
def _normalize_module_name(module_name, py_module_path):
"""
Normalize module name
:param module_name:
:param py_module_path:
:return:
规范化模块名称
Args:
module_name (str): 模块名称
py_module_path (str): Python模块路径
Returns:
None
"""
if py_module_path not in sys.path:
sys.path.insert(0, py_module_path)
@ -298,9 +388,13 @@ def _normalize_module_name(module_name, py_module_path):
def check_support(job: TbeJob):
"""
Check support
:param job:
:return:
检查算子是否受支持
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 算子是否受支持
"""
op_compute_info_list = get_compute_op_list(job.content)
if len(op_compute_info_list) != 1:
@ -341,21 +435,37 @@ def check_support(job: TbeJob):
def select_op_format(job: TbeJob):
"""
Select op format
:param job:
:return:
根据计算操作信息选择操作的格式
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 操作格式选择是否成功
"""
# 获取计算操作列表
compute_op_info_list = get_compute_op_list(job.content)
# 检查计算操作数量是否为1
if len(compute_op_info_list) != 1:
job.error("Invalid op compute num ({}) in check_support".format(len(compute_op_info_list)))
return False
# 获取第一个计算操作信息
compute_op_info = compute_op_info_list[0]
# 调整自定义操作信息
adjust_custom_op_info(compute_op_info)
# 组装操作参数
inputs, outputs, attrs = assemble_op_args(compute_op_info)
# 获取操作模块名称
op_module_name = get_module_name(compute_op_info)
# 获取Python模块路径
py_module_path = compute_op_info["py_module_path"]
# 规范化模块名称
_normalize_module_name(op_module_name, py_module_path)
# 设置操作选择格式的函数名称
op_func_name = "op_select_format"
# 调用操作函数选择格式
res = call_op_func((inputs, outputs, attrs), op_module_name, op_func_name)
# 设置操作格式选择结果
job.result = str(res)
return True
@ -363,15 +473,25 @@ def select_op_format(job: TbeJob):
def parallel_pre_compile_op(job: TbeJob):
"""
Parallel pre compile op
:param job:
:return:
并行预编译操作
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 预编译操作是否成功
"""
# 获取计算操作列表
compute_op_info_list = get_compute_op_list(job.content)
# 检查计算操作数量是否为1
if len(compute_op_info_list) != 1:
job.error("Invalid op compute num ({}) in pre compile op".format(len(compute_op_info_list)))
return False
# 获取第一个计算操作信息
compute_op_info = compute_op_info_list[0]
# 调整自定义操作信息
adjust_custom_op_info(compute_op_info)
# 预构建计算操作信息
_pre_build_compute_op_info(compute_op_info, job)
return True
@ -379,35 +499,60 @@ def parallel_pre_compile_op(job: TbeJob):
def _pre_build_compute_op_info(compute_op, job):
"""
Prebuild by compute op info
:param compute_op:
:param job:
:return:
根据计算操作信息预构建操作
Args:
compute_op (dict): 计算操作信息
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
None
"""
# 获取L1缓存大小
l1_size = job.content["l1_size"]
# 如果L1缓存大小不为-1则设置L1缓存信息
if l1_size != -1:
set_L1_info("op_L1_space", -1)
# 组装操作参数
inputs, outputs, attrs = assemble_op_args(compute_op, is_single_op_build=True)
# 获取操作模块名称
op_module_name = get_module_name(compute_op)
# 获取Python模块路径
py_module_path = compute_op["py_module_path"]
# 获取操作函数名称
op_func_name = compute_op["func_name"]
# 获取操作类型
op_type = compute_op["type"]
# 获取操作名称
op_name = compute_op["op_name"]
# 保存操作参数
save_op_params(op_name, "prebuild", (outputs, attrs))
l1_size = job.content["l1_size"]
# 设置L1缓存信息
set_L1_info("op_L1_space", l1_size)
# 规范化模块名称
_normalize_module_name(op_module_name, py_module_path)
# 获取未知形状信息
unknown_shape = compute_op["unknown_shape"]
# 获取int64模式信息
int64_mode = compute_op["int64mode"]
# 检查操作实现模式
res = check_op_impl_mode(op_module_name, op_func_name)
# 获取操作实现模式
op_impl_mode = job.content["SocInfo"]["op_impl_mode"]
# 获取操作实现模式列表
op_impl_mode_list = job.content["SocInfo"]["op_impl_mode_list"]
# 获取完整操作名称
op_full_name = job.content["full_name"]
# 如果操作不支持实现模式,则发出警告
if not res:
if op_impl_mode_list:
job.warning("The op {} do NOT support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode))
else:
# 否则,记录操作支持实现模式的信息
job.info("OpType {} support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode))
# 获取选项信息
options = get_options_info(job.content)
# 分派预构建任务
dispatch_prebuild_task(job.source_id, job.id, l1_size, op_module_name, op_full_name,
op_type, op_func_name, unknown_shape,
(inputs, outputs, attrs, options), int64_mode, unknown_shape,
@ -416,13 +561,22 @@ def _pre_build_compute_op_info(compute_op, job):
def get_prebuild_output(op_name):
"""
get prebuild output
:param op_name:
Get prebuild output
获取预构建输出
Args:
op_name (str): 操作名称
Returns:
dict: 预构建输出
"""
# 将操作参数转换为JSON字符串
params_str = op_params_to_json(op_name)
try:
# 尝试解析JSON字符串
res = json.loads(params_str)
except ValueError:
# 如果解析失败,则返回空字典
res = {}
finally:
pass
@ -432,9 +586,15 @@ def get_prebuild_output(op_name):
def do_fuzz_build_tbe_op(job: TbeJob):
"""
Fuzzy build op
:param job:
:return:
模糊构建操作
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 模糊构建操作是否成功
"""
# 设置操作结果为"NOT_CHANGED"
job.result = "NOT_CHANGED"
return True
@ -442,9 +602,15 @@ def do_fuzz_build_tbe_op(job: TbeJob):
def _dump_fusion_op_info_to_json_file(job: TbeJob):
"""
Dump fusion op info to json file
:param job:
:return:
将融合操作信息转储到JSON文件
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
None
"""
# 如果系统参数调试路径不为空,则转储融合操作信息
if not job.sys_para_debug_path or job.sys_para_debug_path == "\0":
return
dump_fusion_json(json.dumps(job.content), job.sys_para_debug_path)
@ -453,30 +619,55 @@ def _dump_fusion_op_info_to_json_file(job: TbeJob):
def build_single_pre_op(job: TbeJob):
"""
Build single op
:param job:
:return:
构建单个操作的预处理过程
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 构建过程是否成功
"""
# 执行构建前的处理工作
before_build_process(job)
# 获取计算操作列表
compute_op_info_list = get_compute_op_list(job.content)
# 确保只有一个计算操作
if len(compute_op_info_list) != 1:
job.error("Invalid op compute num ({}) in build single op".format(len(compute_op_info_list)))
return False
# 获取单个计算操作信息
compute_op_info = compute_op_info_list[0]
# 调整自定义操作信息
adjust_custom_op_info(compute_op_info)
# 组装操作的输入、输出和属性
inputs, outputs, attrs = assemble_op_args(compute_op_info, is_single_op_build=True)
# 获取操作类型
op_type = compute_op_info["type"]
# 获取L1缓存大小
l1_size = job.content["l1_size"]
# 获取操作模块名称
op_module_name = get_module_name(compute_op_info)
# 获取操作内核名称
op_kernel_name = compute_op_info["op_name"]
# 获取Python模块路径
py_module_path = compute_op_info["py_module_path"]
# 获取完整操作名称
op_name = job.content["full_name"]
# 获取操作函数名称
op_func_name = compute_op_info["func_name"]
# 规范化模块名称
_normalize_module_name(op_module_name, py_module_path)
# 获取未知形状信息
unknown_shape = compute_op_info["unknown_shape"]
# 获取int64模式信息
int64_mode = compute_op_info["int64mode"]
# 获取操作模式
op_pattern = compute_op_info["pattern"]
# 获取选项信息
options = get_options_info(job.content)
# 获取模糊构建信息
fuzz_build_info = get_fuzz_build_info(job.content)
# 分派单个操作编译任务
dispatch_single_op_compile_task(job.source_id, job.id, l1_size, op_module_name, op_name, op_type, op_func_name,
op_kernel_name, unknown_shape, (inputs, outputs, attrs, options), int64_mode,
None, None, unknown_shape, op_pattern,
@ -487,13 +678,22 @@ def build_single_pre_op(job: TbeJob):
def before_build_process(job: TbeJob):
"""
Processing before build
:param job:
:return:
在构建前进行处理
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
None
"""
# 获取L1缓存大小并设置
l1_size = job.content["l1_size"]
set_L1_info("op_L1_space", l1_size)
# 将融合操作信息转储到JSON文件
_dump_fusion_op_info_to_json_file(job)
# 获取是否需要离线调优
offline_tune = job.sys_offline_tune
# 如果需要离线调优则将融合操作信息转储到JSON文件
if offline_tune:
dump_fusion_json(json.dumps(job.content), job.sys_tune_dump_path)
@ -501,20 +701,29 @@ def before_build_process(job: TbeJob):
def sync_fusion_env(fusion_need_sync, module_list):
"""
Sync fusion env
:param fusion_need_sync:
:param module_list:
:return:
同步融合环境
Args:
fusion_need_sync (int): 是否需要同步融合环境
module_list (dict): 模块列表
Returns:
bool: 同步是否成功
"""
# 如果不需要同步,则直接返回成功
if fusion_need_sync == 0:
return True
# 准备使用的模块列表
module_using = []
for key, value in module_list.items():
if value > 0:
module_using.append(str(key))
module_list[key] = 0
# 将使用的模块列表转换为字符串
module_str = ",".join(module_using)
# 导入使用的模块
import_py_module(module_str)
return True
@ -522,13 +731,23 @@ def sync_fusion_env(fusion_need_sync, module_list):
def parallel_compile_fusion_op(job: TbeJob):
"""
Compile fusion op in parallel compiler
:param job:
:return:
在并行编译器中编译融合操作
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 编译过程是否成功
"""
# 获取L1缓存大小
l1_size = job.content["l1_size"]
# 获取选项信息
options = get_options_info(job.content)
# 获取融合操作内核名称
op_kernel_name = job.content["fusion_op_name"]
# 获取完整操作名称
op_name = job.content["full_name"]
# 分派融合操作编译任务
dispatch_fusion_op_compile_task(job.source_id, job.id, l1_size, json.dumps(job.content), op_kernel_name, None, None,
options, None, job.pass_list, op_name)
return True
@ -537,112 +756,185 @@ def parallel_compile_fusion_op(job: TbeJob):
def ga_tune(job: TbeJob):
"""
GA tune
:param job:
:return:
使用遗传算法进行调优
Args:
job (TbeJob): 包含任务信息的TbeJob对象
Returns:
bool: 调优过程是否成功
"""
# 获取L1缓存大小
l1_size = job.content["l1_size"]
# 获取融合操作内核名称
op_kernel_name = job.content["fusion_op_name"]
# 获取完整操作名称
op_name = job.content["full_name"]
# 分派自动调优任务
dispatch_autotune_task(job.source_id, job.id, l1_size, json.dumps(job.content), {}, op_kernel_name, op_name)
# 设置任务状态为运行中
job.status = JobStatus.JOB_RUNNING
return True
def rl_tune_single_op(job: TbeJob):
"""
RL tune single op
:param job:
:return:
Perform RL (Reinforcement Learning) tuning for a single operation.
This function is responsible for tuning a single operation using RL techniques.
It retrieves the operation's information, performs the tuning, and handles any exceptions that may occur during the process.
Args:
job (TbeJob): An object containing job information, including the operation to be tuned.
Returns:
bool: True if the RL tuning is successful, False otherwise.
"""
# Retrieve the list of compute operations from the job content
compute_op_info_list = get_compute_op_list(job.content)
# Check if there is exactly one compute operation
if len(compute_op_info_list) != 1:
job.error("Invalid op compute num ({}) in rl tune single op".format(len(compute_op_info_list)))
return False
# Get the first (and only) compute operation info
compute_op_info = compute_op_info_list[0]
# Assemble the operation's input, output, and attributes
inputs, outputs, attrs = assemble_op_args(compute_op_info)
# Get the operation type
op_type = compute_op_info["type"]
# Get the L1 size from the job content
l1_size = job.content["l1_size"]
# Get the operation module name
op_module_name = get_module_name(compute_op_info)
# Get the operation kernel name
op_kernel_name = compute_op_info["op_name"]
# Get the full name of the operation
full_name = compute_op_info["name"]
# Get the Python module path
py_module_path = compute_op_info["py_module_path"]
# Get the operation function name
op_func_name = compute_op_info["func_name"]
# Normalize the module name
_normalize_module_name(op_module_name, py_module_path)
# Set the current operation name
set_current_op_name(op_kernel_name)
# Get the unknown shape information
unknown_shape = compute_op_info["unknown_shape"]
# Get the int64 mode information
int64_mode = compute_op_info["int64mode"]
# Get the operation pattern
op_pattern = compute_op_info["pattern"]
# Get the fuzz build information
fuzz_build_info = get_fuzz_build_info(job.content)
# Get the auto tiling mode
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
# Get the device ID
device_id = job.content["SocInfo"]["deviceId"]
# Get the options information
options = get_options_info(job.content)
try:
# Build the single operation from C code
build_single_op_from_c(op_module_name, op_func_name, op_type, "build", unknown_shape,
(inputs, outputs, attrs), int64_mode, unknown_shape, options,
op_pattern, auto_tiling_mode, device_id, json.dumps(fuzz_build_info))
# pylint: disable=broad-except
except Exception:
# If an exception occurs, log the error and return False
job.error(
"Single op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string))
exc_type, exc_value, _ = sys.exc_info()
job.error(
"exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc()))
return False
finally:
pass
# Prepare the tuning operation module name
tune_op_module_name = op_module_name + "@" + py_module_path
# Get the base kernel path
base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o"
# Dispatch the single tune task
from schedule_search.rl_online_tune import dispatch_single_tune_task
pack_args = pack_op_args(inputs, outputs, attrs)
res = dispatch_single_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, full_name,
tune_op_module_name, op_func_name, op_type, pack_args)
# Process the RL tune result
return _process_rl_tune_result(job, op_type, res)
def rl_tune_fusion_op(job: TbeJob):
"""
rl tune fusion op
:param job:
:return:
Perform RL tuning for a fusion operation.
This function is responsible for tuning a fusion operation using RL techniques.
It compiles the operation using multiprocessing and handles any exceptions that may occur during the process.
Args:
job (TbeJob): An object containing job information, including the fusion operation to be tuned.
Returns:
bool: True if the RL tuning is successful, False otherwise.
"""
# Get the fusion operation kernel name
op_kernel_name = job.content["fusion_op_name"]
# Set the current operation name
set_current_op_name(op_kernel_name)
try:
# Compile the operation using multiprocessing
from schedule_search.rl_online_tune import compile_op_by_mp
compile_op_by_mp(json.dumps(job.content))
# pylint: disable=broad-except
except Exception:
# If an exception occurs, log the error and return False
job.error(
"Fusion op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string))
exc_type, exc_value, _ = sys.exc_info()
job.error(
"exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc()))
return False
finally:
pass
# Get the L1 size
l1_size = job.content["l1_size"]
# Get the base kernel path
base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o"
# Get the list of compute operations
compute_op_list = get_compute_op_list(job.content)
# Prepare the operation module names string
op_module_names_str = ""
op_type_set = set()
for op in compute_op_list:
op_module_names_str = ','.join([op_module_names_str, get_module_name(op)])
op_type_set.add(op["type"])
# Remove the leading comma from the operation module names string
op_module_names_str = op_module_names_str[1:]
# Join the operation types with double underscore
op_type = "__".join(list(op_type_set))
# Dispatch the fusion tune task
from schedule_search.rl_online_tune import dispatch_fusion_tune_task
res = dispatch_fusion_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, op_module_names_str,
json.dumps(job.content))
# Process the RL tune result
return _process_rl_tune_result(job, op_type, res)
def _process_rl_tune_result(job, op_type, res):
"""
Process the result of RL tuning.
If the tuning result is False, it checks if the operation type is in the black list or if the job is set to offline tune.
If the tuning result is True, it sets the job status to running.
Args:
job (TbeJob): An object containing job information.
op_type (str): The type of the operation.
res (bool): The result of RL tuning.
Returns:
bool: The processed result of RL tuning.
"""
if not res:
# Check if the operation type is in the black list or if the job is set to offline tune
from schedule_search.tune_util import filter_black_op_type
res = bool(job.sys_offline_tune or os.getenv("REPEAT_TUNE", "False").lower() != "true" or filter_black_op_type(
op_type))
else:
# Set the job status to running
job.status = JobStatus.JOB_RUNNING
res = True
return res
@ -650,8 +942,13 @@ def _process_rl_tune_result(job, op_type, res):
def get_finish_tasks(source_id):
"""
Get finish task from parallel compilation framework
:return task info list
Get the list of finished tasks from the parallel compilation framework.
Args:
source_id (int): The source ID of the tasks.
Returns:
list: A list of finished task information.
"""
return get_finished_compilation_task(source_id)
@ -664,14 +961,21 @@ def tbe_finalize(auto_tiling_mode, offline_tune, job: TbeJob):
:param job: TbeJob
:return: None
"""
# 释放多进程环境
deinit_multi_process_env()
# 如果自动切分模式为RL或者离线调优则释放RL调优
if "RL" in auto_tiling_mode or offline_tune:
from schedule_search.rl_online_tune import rl_tune_deinit
rl_tune_deinit()
# 卸载Cann kb
res = _cann_kb_unload(job)
# 如果卸载失败则返回False
if res == 1:
job.error("Cann kb unload failed")
return False
# 清除融合参数
clear_fusion_params()
# 删除缓存
_remove_cache(job)
# 返回True
return True

@ -26,6 +26,7 @@ class BuildType(Enum):
ACCURATELY = "accurately"
# 获取JobType枚举类中的所有值
job_type_list = [job_type.value for _, job_type in JobType.__members__.items()]
@ -35,14 +36,19 @@ def check_job_json(job_info):
:param job_info:tne compilation job json
:return: raise value error if wrong
"""
# 检查job_info中是否包含source_id
if 'source_id' not in job_info:
raise ValueError("Json string Errors, key:source_id not found.")
# 检查job_info中是否包含job_id
if 'job_id' not in job_info:
raise ValueError("Json string Errors, key:job_id not found.")
# 检查job_info中是否包含job_type
if 'job_type' not in job_info or not job_info['job_type']:
raise ValueError("Json string Errors, key:job_type not found.")
# 检查job_info中job_type是否在job_type_list中
if job_info['job_type'] not in job_type_list:
raise ValueError("Invalid job type: {}.".format(job_info['job_type']))
# 检查job_info中是否包含job_content
if 'job_content' not in job_info:
raise ValueError("Json string Errors, key:job_content not found.")
@ -52,6 +58,7 @@ def reset_op_debug_level_in_soc_info(level):
:param level: op_debug_level, if level is 3 or 4, replace it with 0
:return: op_debug_level
"""
# 如果level为3或4则将其替换为0
if level in ("3", "4"):
level = "0"
return level
@ -62,6 +69,7 @@ def get_real_op_debug_level(initialize_job_info):
:param initialize_job_info: initialize_job_info
:return: origin op_debug_level for init_multi_process_env
"""
# 返回initialize_job_info中op_debug_level的值
return initialize_job_info["SocInfo"]["op_debug_level"]
@ -72,21 +80,35 @@ def get_soc_info(initialize_job_info):
:return: soc info
"""
soc_param = dict()
# 获取soc_info中的op_impl_mode
soc_param["op_impl_mode"] = initialize_job_info["SocInfo"]["op_impl_mode"]
# 获取soc_info中的op_debug_level并调用reset_op_debug_level_in_soc_info函数进行处理
soc_param["op_debug_level"] = reset_op_debug_level_in_soc_info(initialize_job_info["SocInfo"]["op_debug_level"])
# 获取soc_info中的op_impl_mode_list
soc_param["op_impl_mode_list"] = initialize_job_info["SocInfo"]["op_impl_mode_list"]
# 获取soc_info中的op_debug_dir
soc_param["op_debug_dir"] = initialize_job_info["SocInfo"]["op_debug_dir"]
# 获取soc_info中的vector_fp_ceiling
soc_param["vector_fp_ceiling"] = initialize_job_info["SocInfo"]["vector_fp_ceiling"]
# 获取soc_info中的mdl_bank_path
soc_param['mdl_bank_path'] = initialize_job_info["SocInfo"]["mdl_bank_path"]
# 获取soc_info中的op_bank_path
soc_param['op_bank_path'] = initialize_job_info["SocInfo"]["op_bank_path"]
soc_info = list()
# 获取soc_info中的socVersion
soc_info.append(initialize_job_info["SocInfo"]["socVersion"])
# 获取soc_info中的coreType
soc_info.append(initialize_job_info["SocInfo"]["coreType"])
# 获取soc_info中的coreNum
soc_info.append(initialize_job_info["SocInfo"]["coreNum"])
# 获取soc_info中的l1Fusion
soc_info.append(initialize_job_info["SocInfo"]["l1Fusion"])
# 获取soc_info中的l2Mode
soc_info.append(initialize_job_info["SocInfo"]["l2Mode"])
# 获取soc_info中的l2Fusion
soc_info.append(initialize_job_info["SocInfo"]["l2Fusion"])
# 将soc_param添加到soc_info中
soc_info.append(soc_param)
return soc_info
@ -98,16 +120,22 @@ def check_arg_info(io_info):
:param io_info:A dict, to be checked.
:return: Exception: If specific keyword is not found.
"""
# 检查io_info中是否包含shape
if 'shape' not in io_info:
raise ValueError("Json string Errors, key:shape not found.")
# 检查io_info中是否包含ori_shape
if 'ori_shape' not in io_info:
raise ValueError("Json string Errors, key:ori_shape not found.")
# 检查io_info中是否包含format
if 'format' not in io_info or not io_info['format']:
raise ValueError("Json string Errors, key:format not found.")
# 检查io_info中是否包含ori_format
if 'ori_format' not in io_info or not io_info['ori_format']:
raise ValueError("Json string Errors, key:ori_format not found.")
# 检查io_info中是否包含dtype
if 'dtype' not in io_info or not io_info['dtype']:
raise ValueError("Json string Errors, key:dtype not found.")
# 检查io_info中是否包含param_type
if 'param_type' not in io_info or not io_info['param_type']:
raise ValueError("Json string Errors, key:param_type not found.")
@ -119,18 +147,28 @@ def get_input_output_args(io_info):
:return:input/output args
"""
args = []
# 如果io_info为空则返回空列表
if io_info is None:
return args
# 遍历io_info中的每个元素
for item in io_info:
# 如果元素是字典类型
if isinstance(item, dict):
# 调用get_single_io_arg函数获取单个输入/输出参数
arg = get_single_io_arg(item)
args.append(arg)
elif isinstance(item, list):
# 如果元素是列表类型
dyn_arg = []
# 创建一个空列表dyn_arg
for info in item:
# 遍历列表中的每个元素
arg = get_single_io_arg(info)
# 调用get_single_io_arg函数获取单个输入/输出参数
dyn_arg.append(arg)
# 将参数添加到dyn_arg列表中
args.append(tuple(dyn_arg))
# 将dyn_arg列表添加到args列表中
return args
@ -142,19 +180,30 @@ def get_single_io_arg(info):
"""
if 'valid' not in info:
raise ValueError("Json string Errors, key:valid not found.")
# 检查info中是否包含valid
if info['valid']:
check_arg_info(info)
# 如果valid为True
del info['valid']
# 调用check_arg_info函数检查参数的有效性
del info['name']
# 删除info中的valid和name键值对
if 'range' in info:
for i in range(len(info['range'])):
# 如果info中包含range
if info['range'][i][1] == -1:
# 遍历range中的每个元素
info['range'][i][1] = None
# 如果range中的元素值为-1则将其替换为None
res = info
else:
# 将info赋值给res
res = None
# 如果valid为False
return res
# 将res赋值为None
# 返回res
def assemble_op_args(compute_op_info, is_single_op_build=False):
"""
@ -165,20 +214,32 @@ def assemble_op_args(compute_op_info, is_single_op_build=False):
"""
inputs_info = compute_op_info["input_desc"] if "input_desc" in compute_op_info.keys() else None
outputs_info = compute_op_info["output_desc"] if "output_desc" in compute_op_info.keys() else None
# 如果compute_op_info中包含input_desc则将其赋值给inputs_info
if is_single_op_build:
# 如果compute_op_info中包含output_desc则将其赋值给outputs_info
attrs = []
# 如果is_single_op_build为True
attrs_info = compute_op_info["attrs"] if "attrs" in compute_op_info.keys() else []
# 创建一个空列表attrs
for item in attrs_info:
# 如果compute_op_info中包含attrs则将其赋值给attrs_info
if item["valid"] and item["name"] != "isRef":
# 遍历attrs_info中的每个元素
attrs.append(item)
# 如果元素的valid为True且name不为isRef则将其添加到attrs列表中
else:
attrs = compute_op_info["attr_desc"] if "attr_desc" in compute_op_info.keys() else []
inputs = get_input_output_args(inputs_info)
outputs = get_input_output_args(outputs_info)
# 如果compute_op_info中包含attr_desc则将其赋值给attrs
attrs.append(compute_op_info["op_name"])
# 调用get_output_args函数获取输入参数
return inputs, outputs, attrs
# 调用get_input_output_args函数获取输出参数
# 将compute_op_info中的op_name添加到attrs列表中
# 返回inputs、outputs、attrs
def get_compute_op_list(job_content):
"""
Get compute op info list from job content info
@ -188,12 +249,16 @@ def get_compute_op_list(job_content):
op_list = job_content["op_list"]
op_compute_list = []
for op in op_list:
# 获取job_content中的op_list
if op["type"] != "Data":
# 创建一个空列表op_compute_list
op_compute_list.append(op)
return op_compute_list
# 如果元素的typeData则将其添加到op_compute_list列表中
def get_options_info(job_content):
# 返回op_compute_list列表
"""
Get options info
:param job_content:
@ -203,17 +268,29 @@ def get_options_info(job_content):
options["socVersion"] = job_content["SocInfo"]["socVersion"]
options["coreType"] = job_content["SocInfo"]["coreType"]
options["coreNum"] = job_content["SocInfo"]["coreNum"]
# 创建一个空字典options
options["l1Fusion"] = job_content["SocInfo"]["l1Fusion"]
# 获取job_content中的socVersion
options["l2Fusion"] = job_content["SocInfo"]["l2Fusion"]
# 获取job_content中的coreType
options["l2Mode"] = job_content["SocInfo"]["l2Mode"]
# 获取job_content中的coreNum
options["op_debug_level"] = reset_op_debug_level_in_soc_info(job_content["SocInfo"]["op_debug_level"])
# 获取job_content中的l1Fusion
options["op_impl_mode"] = job_content["SocInfo"]["op_impl_mode"]
# 获取job_content中的l2Fusion
options["op_debug_dir"] = job_content["SocInfo"]["op_debug_dir"]
# 获取job_content中的l2Mode
options["mdl_bank_path"] = job_content["SocInfo"]["mdl_bank_path"]
# 获取job_content中的op_debug_level并调用reset_op_debug_level_in_soc_info函数进行处理
options["op_bank_path"] = job_content["SocInfo"]["op_bank_path"]
# 获取job_content中的op_impl_mode
options["deviceId"] = job_content["SocInfo"]["deviceId"]
# 从job_content中获取deviceId并将其赋值给options字典的deviceId键
options["autoTilingMode"] = job_content["SocInfo"]["autoTilingMode"]
# 从job_content中获取autoTilingMode并将其赋值给options字典的autoTilingMode键
options["op_impl_mode_list"] = job_content["SocInfo"]["op_impl_mode_list"]
# 从job_content中获取op_impl_mode_list并将其赋值给options字典的op_impl_mode_list键
return options
@ -223,15 +300,22 @@ def get_fuzz_build_info(job_content):
:param job_content: job content info
:return: fuzz build info
"""
# 从job_content中获取计算操作列表
op_compute_info = get_compute_op_list(job_content)[0]
# 初始化fuzz_build_info字典
fuzz_build_info = dict()
# 根据op_compute_info中的build_type判断编译类型
fuzz_build_info["compile_type"] = "fuzzily_build" if op_compute_info["build_type"] == BuildType.FUZZILY.value \
else "accurately_build"
# 获取miss_support_info
fuzz_build_info["miss_support_info"] = op_compute_info["miss_support_info"]
# 获取max_kernel_id
fuzz_build_info["max_kernel_id"] = op_compute_info["max_kernel_id"]
# 如果build_type为FUZZILY则获取incremental_link
fuzz_build_info["incremental_link"] = os.path.realpath(
job_content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_compute_info["name"] + ".json") if \
op_compute_info["build_type"] == BuildType.FUZZILY.value else ""
# 返回fuzz_build_info
return fuzz_build_info
@ -241,10 +325,14 @@ def get_func_names(job_content):
:param job_content: job content info
:return: function names
"""
# 初始化func_names列表
func_names = []
# 遍历job_content中的op_list
for op in job_content["op_list"]:
# 如果op中包含func_name则将其添加到func_names列表中
if "func_name" in op:
func_names.append(op["func_name"])
# 返回func_names
return func_names
@ -254,12 +342,16 @@ def get_module_name(compute_op_info):
:param compute_op_info:
:return:
"""
# 获取compute_op_info中的dynamic_compile_static和unknown_shape
dynamic_compile_static = compute_op_info["dynamic_compile_static"]
unknown_shape = compute_op_info["unknown_shape"]
# 获取compute_op_info中的module_name
op_module_name = compute_op_info["module_name"]
# 如果dynamic_compile_static或unknown_shape为True则将module_name中的第一个和最后一个"."之间的字符串替换为".dynamic."
if dynamic_compile_static or unknown_shape:
d = ".dynamic."
op_module_name = d.join((op_module_name.split(".")[0], op_module_name.split(".")[-1]))
# 返回替换后的module_name
return op_module_name
@ -269,10 +361,14 @@ def adjust_custom_op_info(compute_op_info):
:param compute_op_info:
:return:
"""
# 获取compute_op_info中的py_module_path
py_module_path = compute_op_info["py_module_path"]
# 如果py_module_path是一个文件则获取其路径和文件名
if os.path.isfile(py_module_path):
py_module_path, file_name = os.path.split(py_module_path)
# 获取文件名中的模块名
module_name, _ = os.path.splitext(file_name)
# 将py_module_path和module_name更新到compute_op_info中
compute_op_info["py_module_path"] = py_module_path
compute_op_info["module_name"] = module_name
@ -281,5 +377,6 @@ def pack_op_args(inputs, outputs, attrs):
"""
flatten inputs outputs attrs
"""
# 将inputs、outputs、attrs展开为一个列表
op_args = (inputs, outputs, attrs)
return [item for arg in op_args for item in arg]

@ -20,14 +20,23 @@ from enum import Enum
class JobType(Enum):
""" Job Type """
# 初始化任务
INITIALIZE_JOB = 'Initialize'
# 结束任务
FINALIZE_JOB = 'Finalize'
# 检查支持任务
CHECK_JOB = 'CheckSupport'
# 选择格式任务
SELECT_JOB = 'SelectFormat'
# 预编译任务
PRECOMPILE_JOB = 'PreCompile'
# 编译任务
COMPILE_JOB = 'Compile'
# 融合编译任务
FUSION_COMPILE_JOB = 'FusionOpCompile'
# 调优任务
TUNE_JOB = 'Tune'
# 查询任务
QUERY_JOB = 'Query'
@ -51,9 +60,13 @@ class JobStatus(Enum):
class LogMessage:
""" Log message """
# 初始化函数,用于创建一个对象
def __init__(self, index, level, info):
# 将传入的index参数赋值给对象的index属性
self.index = index
# 将传入的level参数赋值给对象的level属性
self.level = level
# 将传入的info参数赋值给对象的info属性
self.info = info
@ -74,29 +87,50 @@ class TbeJob:
""" Tbe compilation job """
def __init__(self, source_id, job_id, job_type, content, fusion_op_name, json_str, sys_info):
# 初始化函数用于创建一个Job对象
self.source_id = source_id
# 源ID
self.id = job_id
# 任务ID
self.type = JobType(job_type)
# 任务类型
self.status = JobStatus.JOB_INITIAL
# 任务状态
self.content = content
# 任务内容
self.fusion_op_name = fusion_op_name
# 融合操作名称
self.result = ""
# 任务结果
self.process_info = []
# 任务处理信息
self.json_string = json_str
# JSON字符串
self._sys_logger = sys_info["logger"]
# 系统日志
self.sys_offline_tune = sys_info["offline_tune"]
# 离线调优
self.sys_tune_dump_path = sys_info["tune_dump_path"]
# 调优转储路径
self.sys_para_debug_path = sys_info["para_debug_path"]
# 参数调试路径
# license info
self.rl_tune_switch = sys_info["rl_tune_switch"]
# 强化学习调优开关
self.rl_tune_list = sys_info["rl_tune_list"]
# 强化学习调优列表
self.op_tune_switch = sys_info["op_tune_switch"]
# 操作调优开关
self.op_tune_list = sys_info["op_tune_list"]
# 操作调优列表
self.pass_list = sys_info["pass_list"]
# 通过列表
# soc info
self.soc_version = sys_info["socVersion"]
# SoC版本
self.core_num = sys_info["coreNum"]
# 核心数量
self.op_bank_path = sys_info["op_bank_path"]
def debug(self, msg, *args, **kwargs):
@ -106,9 +140,13 @@ class TbeJob:
:param args:
:return:
"""
# 获取处理后的消息
processed_msg = _get_message(msg, args)
# 创建日志消息对象
message = LogMessage(len(self.process_info), LogLevel.DEBUG, processed_msg)
# 将日志消息对象添加到process_info列表中
self.process_info.append(message)
# 使用系统日志记录器记录日志
self._sys_logger.debug(msg, *args, **kwargs)
def info(self, msg, *args, **kwargs):
@ -118,9 +156,13 @@ class TbeJob:
:param args:
:return:
"""
# 获取处理后的消息
processed_msg = _get_message(msg, args)
# 创建日志消息对象
message = LogMessage(len(self.process_info), LogLevel.INFO, processed_msg)
# 将日志消息对象添加到process_info列表中
self.process_info.append(message)
# 使用系统日志记录器记录日志
self._sys_logger.info(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
@ -130,9 +172,13 @@ class TbeJob:
:param args:
:return:
"""
# 获取处理后的消息
processed_msg = _get_message(msg, args)
# 创建日志消息对象
message = LogMessage(len(self.process_info), LogLevel.WARNING, processed_msg)
# 将日志消息对象添加到process_info列表中
self.process_info.append(message)
# 使用系统日志记录器记录警告信息
self._sys_logger.warning(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
@ -142,9 +188,13 @@ class TbeJob:
:param args:
:return:
"""
# 获取处理后的消息
processed_msg = _get_message(msg, args)
# 创建一个LogMessage对象包含消息的长度、日志级别和消息内容
message = LogMessage(len(self.process_info), LogLevel.ERROR, processed_msg)
# 将LogMessage对象添加到process_info列表中
self.process_info.append(message)
# 使用_sys_logger记录错误日志msg为原始消息args和kwargs为参数
self._sys_logger.error(msg, *args, **kwargs)
def error_manager(self, msg, *args, **kwargs):
@ -154,30 +204,50 @@ class TbeJob:
:param args:
:return:
"""
# 如果msg为空则输出警告信息并返回
if not msg:
self.warning("Get empty error manager message, op_name: {}".format(self.fusion_op_name))
return
# 初始化异常信息为None
exception_info = None
# 获取融合操作名称
op_name = self.fusion_op_name
# 如果msg是Exception类型
if isinstance(msg, Exception):
# 遍历msg的参数
for arg in msg.args:
# 如果参数是字典类型且包含"errCode"键
if isinstance(arg, dict) and "errCode" in arg:
# 将异常信息赋值给exception_info
exception_info = arg
break
# 如果没有找到异常信息
if not exception_info:
# 输出错误信息
self.error("Exception message:{}".format(msg))
return
# 如果msg不是Exception类型
else:
# 将msg的第一个元素赋值给异常信息
exception_info = msg[0]
# 如果msg的长度大于等于2
if len(msg) >= 2:
# 将msg的第二个元素赋值给融合操作名称
op_name = msg[1]
# 如果异常信息不是字典类型或为空
if not isinstance(exception_info, dict) or not exception_info:
# 输出警告信息
self.warning("Get illegal error manager message, op_name: {}".format(self.fusion_op_name))
return
# 将异常信息中的op_name字段赋值为融合操作名称
exception_info["op_name"] = op_name
# 将异常信息转换为JSON格式
processed_msg = json.dumps(exception_info)
# 创建LogMessage对象
message = LogMessage(len(self.process_info), LogLevel.ERROR_MANAGER, processed_msg)
# 将LogMessage对象添加到process_info列表中
self.process_info.append(message)
# 输出异常信息
self._sys_logger.exception(msg, *args, **kwargs)
def get_result(self):
@ -186,15 +256,26 @@ class TbeJob:
:return: job process result string
"""
result = dict()
# 获取任务状态
result["status"] = self.status.value
# 获取任务源ID
result["source_id"] = self.source_id
# 获取任务ID
result["job_id"] = self.id
# 获取任务类型
result["job_type"] = self.type.value
# 获取融合操作名称
result["fusion_op_name"] = self.fusion_op_name
# 获取任务结果
result["result"] = self.result
process_info = []
# 遍历任务处理信息
for info in self.process_info:
# 构造消息字典
msg = {"index": info.index, "level": info.level.value, "message": info.info}
# 将消息字典添加到处理信息列表中
process_info.append(msg)
# 将处理信息列表添加到结果字典中
result["process_info"] = process_info
# 将结果字典转换为JSON字符串并返回
return json.dumps(result)

@ -29,6 +29,7 @@ class TbeJobManager:
""" TBE compiler job manager """
def __init__(self):
# 定义一个字典,用于存储不同类型的任务及其对应的处理函数
self.job_handlers = {
JobType.INITIALIZE_JOB: self.initialize_handler,
JobType.FINALIZE_JOB: self.finalize_handler,
@ -41,24 +42,43 @@ class TbeJobManager:
JobType.QUERY_JOB: self.query_handler
}
# 定义一个字典,用于存储所有任务
self._all_jobs = {}
# 定义一个字典,用于存储已完成任务
self._finished_jobs = {}
# 定义一个字典,用于存储正在运行的任务
self._running_jobs = {}
# 定义一个字典,用于存储原始完成任务
self._raw_finish_jobs = {}
# 定义一个布尔值用于判断TBE是否初始化
self.tbe_initialize = False
# 定义一个变量,用于存储初始化缓存
self.init_cache = None
# 定义一个字符串,用于存储参数调试路径
self.para_debug_path = ""
# 定义一个字符串,用于存储自动调优模式
self.auto_tiling_mode = ""
# 定义一个布尔值,用于判断是否离线调优
self.offline_tune = False
# 定义一个列表,用于存储调优操作
self.tune_op_list = []
# 定义一个字符串,用于存储调优输出路径
self.tune_dump_path = ""
# 定义一个字符串,用于存储调优库路径
self.tune_bank_path = ""
# 定义一个列表,用于存储自动调优操作
self.auto_tune_op_list = []
# 定义一个字典,用于存储预编译操作
self.pre_build_ops = {}
# 定义一个整数,用于存储融合编译需要同步的次数
self.fusion_need_sync = 0
# 定义一个字典,用于存储导入的模块
self.imported_module = {}
# 定义一个字符串用于存储SoC版本
self.soc_version = ""
# 定义一个整数,用于存储核心数量
self.core_num = 0
# 定义一个字符串,用于存储操作库路径
self.op_bank_path = ""
# license info
self.rl_tune_switch = ""
@ -68,6 +88,7 @@ class TbeJobManager:
self.pass_list = ""
def __del__(self):
# 删除对象时调用reset方法
self.reset()
def reset(self):
@ -75,22 +96,38 @@ class TbeJobManager:
Reset the job manager
:return: None
"""
# 重置所有任务
self._all_jobs = {}
# 重置已完成任务
self._finished_jobs = {}
# 重置正在运行的任务
self._running_jobs = {}
# 重置原始已完成任务
self._raw_finish_jobs = {}
# 重置调试路径
self.para_debug_path = ""
# 重置自动切分模式
self.auto_tiling_mode = ""
# 重置离线调优
self.offline_tune = False
# 重置调优操作列表
self.tune_op_list = []
# 重置调优导出路径
self.tune_dump_path = ""
# 重置调优银行路径
self.tune_bank_path = ""
# 重置自动调优操作列表
self.auto_tune_op_list = []
# 重置预构建操作
self.pre_build_ops = []
# 重置融合需要同步
self.fusion_need_sync = 0
# 重置导入模块
self.imported_module = {}
# 如果tbe_initialize为True则调用tbe_finalize方法
if self.tbe_initialize:
tbe_finalize(self.auto_tiling_mode, self.offline_tune, self.init_cache)
# 重置tbe_initialize
self.tbe_initialize = False
self.init_cache = None
self.soc_version = ""
@ -105,11 +142,17 @@ class TbeJobManager:
"""
job = None
try:
# 将job_str转换为json格式
job_json = json.loads(job_str)
# 检查job_json的合法性
check_job_json(job_json)
# 获取job_id
job_id = job_json["job_id"]
# 获取source_id
source_id = job_json["source_id"]
# 获取job_type
job_type = job_json["job_type"]
# 获取系统信息
sys_info = self._get_job_sys_info()
fusion_op_name = "NA" if "fusion_op_name" not in job_json["job_content"] else job_json["job_content"][
"fusion_op_name"]
@ -140,173 +183,260 @@ class TbeJobManager:
def initialize_handler(self, job: TbeJob):
""" Initialize job handler """
# 初始化系统信息
self._init_sys_info(job)
# 调用tbe_initialize函数初始化job
res = tbe_initialize(job)
# 如果初始化失败记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process Initialize Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 如果auto_tiling_mode中包含"GA",则获取自动调优支持的操作列表
if "GA" in self.auto_tiling_mode:
self.auto_tune_op_list = get_auto_tune_support_op_list(job)
# 设置tbe_initialize为True
self.tbe_initialize = True
# 将job保存到init_cache中
self.init_cache = job
# 将job状态设置为JOB_SUCCESS
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def finalize_handler(self, job: TbeJob):
""" Finalize job handler """
# 如果tbe_initialize为False则直接将job状态设置为JOB_SUCCESS
if not self.tbe_initialize:
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
# 调用tbe_finalize函数传入auto_tiling_mode和offline_tune参数
res = tbe_finalize(self.auto_tiling_mode, self.offline_tune, job)
# 如果finalize失败记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process Finalize Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 将job状态设置为JOB_SUCCESS
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def check_support_handler(self, job: TbeJob):
""" Check Support job handler """
# 调用check_support函数检查job是否支持
res = check_support(job)
# 如果不支持记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process CheckSupport Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 更新导入的操作模块
self._update_imported_op_module(job)
# 将job状态设置为JOB_SUCCESS
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def select_format_handler(self, job: TbeJob):
""" Select Format job handler """
# 调用select_op_format函数选择操作格式
res = select_op_format(job)
# 如果选择失败记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process SelectFormat Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 将job状态设置为JOB_SUCCESS
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def pre_compile_handler(self, job: TbeJob):
""" Pre Compile job handler """
# 调用parallel_pre_compile_op函数对job进行预处理
res = parallel_pre_compile_op(job)
# 如果预处理失败则记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process PreCompile Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 将job添加到pre_build_ops字典中以fusion_op_name为键
self.pre_build_ops[job.content["fusion_op_name"]] = job
# 将job状态设置为JOB_RUNNING
return self.add_to_running_jobs(job)
def compile_handler(self, job: TbeJob):
""" Compile job handler """
# 获取job中的compute_op_list
compute_op_list = get_compute_op_list(job.content)
# 如果compute_op_list只有一个元素则调用single_op_compile函数进行编译
if len(compute_op_list) == 1: # pylint: disable=no-else-return
return self.single_op_compile(job)
else:
# 调用before_build_process函数对job进行预处理
before_build_process(job)
# 如果需要同步fusion则调用sync_fusion_env函数进行同步
if self.fusion_need_sync:
sync_fusion_env(self.fusion_need_sync, self.imported_module)
self.fusion_need_sync = 0
# 调用parallel_compile_fusion_op函数对job进行编译
res = parallel_compile_fusion_op(job)
# 如果编译失败则记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Parallel_compile_fusion_op Job failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 将job状态设置为JOB_RUNNING
return self.add_to_running_jobs(job)
def single_op_compile(self, job: TbeJob):
"""Single operator compile"""
# 调用do_fuzz_build_tbe_op函数对job进行编译
res = do_fuzz_build_tbe_op(job)
# 如果编译失败则记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process do fuzz build tbe op failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 如果job.result为"NOT_CHANGED"则调用before_build_process函数进行预处理并调用build_single_pre_op函数进行编译
if job.result == "NOT_CHANGED":
job.result = ""
before_build_process(job)
res = build_single_pre_op(job)
# 如果编译失败则记录错误信息并将job状态设置为JOB_FAILED
if not res:
job.error("Process build single pre op failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 将job状态设置为JOB_RUNNING
return self.add_to_running_jobs(job)
# 如果job.result为"SUCCESS"则将job状态设置为JOB_SUCCESS
if job.result == "SUCCESS":
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
# 如果编译失败则记录错误信息并将job状态设置为JOB_FAILED
job.error("Process do fuzz build tbe op failed, job json string:{}".format(job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
def tune_handler(self, job: TbeJob):
""" Tune job handler """
before_build_process(job)
# 选择调优模式
tune_mode = self._select_tune_mode(job)
# 如果调优模式为不调优,则直接调用编译处理函数
if tune_mode == TuneMode.NO_TUNE:
return self.compile_handler(job)
# 获取计算操作列表
compute_op_list = get_compute_op_list(job.content)
# 如果计算操作列表只有一个,则调用单操作调优函数
if len(compute_op_list) == 1:
return self.single_op_tune(job)
# 否则调用融合操作调优函数
return self.fusion_op_tune(job)
def single_op_tune(self, job: TbeJob):
"""Single operator tune"""
# 选择调优模式
tune_mode = self._select_tune_mode(job)
# 如果调优模式为强化学习调优
if tune_mode == TuneMode.RL_TUNE:
# 调用强化学习单操作调优函数
res = rl_tune_single_op(job)
# 如果调优失败,则记录错误信息,并将任务状态设置为失败
if not res:
job.error(
"Tune Job failed, tune type {}, job json string:{}".format(tune_mode, job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 否则,如果需要同步融合环境,则调用同步融合环境函数
else:
if self.fusion_need_sync:
sync_fusion_env(self.fusion_need_sync, self.imported_module)
self.fusion_need_sync = 0
# 调用遗传算法调优函数
res = ga_tune(job)
# 如果调优失败,则记录错误信息,并调用编译处理函数
if not res:
job.error("ga tune Job failed, job json string:{}".format(job.json_string))
return self.compile_handler(job)
# 如果任务状态为运行中
if job.status == JobStatus.JOB_RUNNING:
# 如果调优模式为强化学习调优,则更新导入的操作模块
if tune_mode == TuneMode.RL_TUNE:
self._update_imported_op_module(job)
# 将任务添加到运行中任务列表
return self.add_to_running_jobs(job)
# 否则将任务添加到已完成任务列表,并设置任务状态为成功
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def fusion_op_tune(self, job: TbeJob):
"""Fusion operator tune"""
# 选择调优模式
tune_mode = self._select_tune_mode(job)
# 如果需要同步融合环境,则调用同步融合环境函数
if self.fusion_need_sync:
sync_fusion_env(self.fusion_need_sync, self.imported_module)
self.fusion_need_sync = 0
# 如果调优模式为强化学习调优,则调用强化学习融合操作调优函数
if tune_mode == TuneMode.RL_TUNE:
res = rl_tune_fusion_op(job)
# 否则调用遗传算法调优函数
else:
res = ga_tune(job)
# 如果调优失败,则记录错误信息,并将任务状态设置为失败
if not res:
job.error(
"Tune Job failed, tune type {}, job json string:{}".format(tune_mode, job.json_string))
return self.add_to_finished_jobs(job, JobStatus.JOB_FAILED)
# 如果任务状态为运行中,则将任务添加到运行中任务列表
if job.status == JobStatus.JOB_RUNNING:
return self.add_to_running_jobs(job)
# 否则将任务添加到已完成任务列表,并设置任务状态为成功
return self.add_to_finished_jobs(job, JobStatus.JOB_SUCCESS)
def query_handler(self, query_job: TbeJob):
""" Query job handler """
# 获取查询任务的source_id和job_id
target_source_id = query_job.content["source_id"]
target_job_id = query_job.content["job_id"]
# 根据source_id和job_id获取已完成的任务
target_job = get_job(self._finished_jobs, target_source_id, target_job_id)
# 如果找到了已完成的任务
if target_job:
# 记录警告信息
query_job.warning("Query a finished job: {}".format(query_job.content))
# 将查询任务的结果设置为已完成任务的结果
query_job.result = target_job.get_result()
# 将查询任务添加到已完成任务列表中,并返回成功状态
return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS)
# 根据source_id和job_id获取未完成的任务
target_job = get_job(self._raw_finish_jobs, target_source_id, target_job_id)
# 如果未找到未完成的任务
if not target_job:
# 更新未完成的任务列表
self.update_raw_finished_jobs(query_job)
# 再次根据source_id和job_id获取未完成的任务
target_job = get_job(self._raw_finish_jobs, target_source_id, target_job_id)
# 如果找到了未完成的任务
if target_job:
# 记录调试信息
query_job.debug("Found job in raw finished jobs, source_id:{}, job_id:{}".format(target_source_id,
target_job_id))
# 将查询任务的结果设置为未完成任务的结果
query_job.result = target_job.get_result()
# 从未完成任务列表中删除该任务
del_job(self._raw_finish_jobs, target_job.source_id, target_job.id)
# 将未完成任务添加到已完成任务列表中,并返回成功状态
self.add_to_finished_jobs(target_job, target_job.status)
return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS)
# 根据source_id和job_id获取正在运行的任务
target_job = get_job(self._running_jobs, target_source_id, target_job_id)
# 如果找到了正在运行的任务
if target_job:
# 将查询任务的结果设置为正在运行任务的结果
query_job.result = target_job.get_result()
# 将查询任务添加到已完成任务列表中,并返回成功状态
return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS)
# 根据source_id和job_id获取所有任务
target_job = get_job(self._all_jobs, target_source_id, target_job_id)
# 如果找到了所有任务
if target_job:
# 记录调试信息
query_job.debug("Found job in all jobs, source_id:{}, job_id:{}".format(target_source_id,
target_job_id))
# 记录调试信息
target_job.debug("Be Queried")
# 将查询任务的结果设置为所有任务的结果
query_job.result = target_job.get_result()
# 将查询任务添加到已完成任务列表中,并返回成功状态
return self.add_to_finished_jobs(query_job, JobStatus.JOB_SUCCESS)
# 如果没有找到任何任务,记录错误信息
query_job.error("Can't find job in finished/raw_finished/running jobs, source_id: {}".format(target_source_id))
# 将查询任务的结果设置为空
query_job.result = ""
# 将查询任务添加到已完成任务列表中,并返回失败状态
return self.add_to_finished_jobs(query_job, JobStatus.JOB_FAILED)
def _get_job_sys_info(self):
@ -314,10 +444,15 @@ class TbeJobManager:
Get job manager system info
:return: system info
"""
# 创建一个字典,用于存储系统信息
sys_info = dict()
# 将DummyLogger添加到系统信息中
sys_info["logger"] = DummyLogger
# 将para_debug_path添加到系统信息中
sys_info["para_debug_path"] = self.para_debug_path
# 将tune_dump_path添加到系统信息中
sys_info["tune_dump_path"] = self.tune_dump_path
# 将offline_tune添加到系统信息中
sys_info["offline_tune"] = self.offline_tune
# license info
sys_info["rl_tune_switch"] = self.rl_tune_switch
@ -362,12 +497,17 @@ class TbeJobManager:
:param job:
:return:
"""
# 获取计算操作列表
compute_op_info = get_compute_op_list(job.content)[0]
# 获取操作模块名称
op_module_name = compute_op_info["module_name"]
# 如果操作模块名称在已导入模块中,则增加引用次数
if op_module_name in self.imported_module.keys():
self.imported_module[op_module_name] = self.imported_module[op_module_name] + 1
# 否则将操作模块名称添加到已导入模块中并设置引用次数为1
else:
self.imported_module[op_module_name] = 1
# 增加融合需要同步的次数
self.fusion_need_sync = self.fusion_need_sync + 1
def _select_tune_mode(self, job):
@ -376,18 +516,25 @@ class TbeJobManager:
:param job: tbe tune job
:return: NO_TUNE RL_TUNE or GA_TUNE
"""
# 获取job的SocInfo中的autoTilingMode和offlineTune
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
offline_tune = job.content["SocInfo"]["offlineTune"]
# 获取job的full_name
full_name = job.content["full_name"]
# 获取job的func_names
func_names = get_func_names(job.content)
# 如果self.tune_op_list不为空且full_name不在self.tune_op_list中则返回TuneMode.NO_TUNE
if self.tune_op_list and full_name not in self.tune_op_list:
return TuneMode.NO_TUNE
# 如果offline_tune为True则返回TuneMode.RL_TUNE
if offline_tune:
return TuneMode.RL_TUNE
# 如果auto_tiling_mode中包含TuneMode.GA_TUNE.value则遍历func_names如果func_name.lower()在self.auto_tune_op_list中则返回TuneMode.GA_TUNE
if TuneMode.GA_TUNE.value in auto_tiling_mode:
for func_name in func_names:
if func_name.lower() in self.auto_tune_op_list:
return TuneMode.GA_TUNE
# 如果auto_tiling_mode中包含TuneMode.RL_TUNE.value则返回TuneMode.RL_TUNE
if TuneMode.RL_TUNE.value in auto_tiling_mode:
return TuneMode.RL_TUNE
return TuneMode.NO_TUNE
@ -398,15 +545,22 @@ class TbeJobManager:
:param query_job: query job
:return: Node
"""
# 获取已完成任务
new_finished_jobs = get_finish_tasks(query_job.source_id)
# 遍历已完成任务
for new_job in new_finished_jobs:
# 获取任务ID
source_id = new_job["graph_id"]
job_id = new_job["task_id"]
# 获取任务
target_job = get_job(self._running_jobs, source_id, job_id)
# 如果任务不存在,则报错
if not target_job:
query_job.error("Can't get job, source id:{}, job id:{}".format(source_id, job_id))
continue
# 设置任务结果
target_job.result = new_job["op_res"] if "op_res" in new_job else new_job["result"]
# 如果任务类型为预编译任务,则进行预编译
if target_job.type == JobType.PRECOMPILE_JOB:
op_name = target_job.content["fusion_op_name"]
op_params = get_prebuild_output(op_name)
@ -415,13 +569,17 @@ class TbeJobManager:
pre_compile_result["op_params"] = op_params
pre_compile_result["core_type"] = new_job["core_type"] if "core_type" in new_job else ""
target_job.result = json.dumps(pre_compile_result)
# 输出任务结果
target_job.info("Query result:{}".format(new_job["result"]))
# 如果任务状态码为0则任务成功
if new_job["status_code"] == 0:
target_job.status = JobStatus.JOB_SUCCESS
target_job.info("Query info_msg:{}".format(new_job["info_msg"]))
# 否则任务失败
else:
target_job.status = JobStatus.JOB_FAILED
target_job.error("Query info_msg:{}".format(new_job["info_msg"]))
# 输出错误信息
if "err_args" in new_job:
target_job.error("Query err_args:{}".format(new_job["err_args"]))
if "except_msg" in new_job:
@ -429,7 +587,9 @@ class TbeJobManager:
if "except_tuple_msg" in new_job:
target_job.error_manager(new_job["except_tuple_msg"])
target_job.error("\nOriginal compile json: \n {}\n".format(target_job.json_string))
# 将任务添加到已完成任务列表
post_job(self._raw_finish_jobs, target_job)
# 从运行中任务列表中删除任务
del_job(self._running_jobs, target_job.source_id, target_job.id)
def add_to_finished_jobs(self, job, status):
@ -456,8 +616,11 @@ class TbeJobManager:
class TuneMode(Enum):
"""Class of tune mode: NO_TUNE, GA, RL"""
# 不调优模式
NO_TUNE = "NO_TUNE"
# 遗传算法调优模式
GA_TUNE = "GA"
# 强化学习调优模式
RL_TUNE = "RL"
@ -469,18 +632,22 @@ class DummyLogger:
@staticmethod
def debug(msg, *args, **kwargs):
"""Debug级别日志"""
pass
@staticmethod
def info(msg, *args, **kwargs):
"""Info级别日志"""
pass
@staticmethod
def warning(msg, *args, **kwargs):
"""Warning级别日志"""
pass
@staticmethod
def error(msg, *args, **kwargs):
"""Error级别日志"""
pass
@staticmethod
@ -497,10 +664,13 @@ def get_job(jobs, source_id, job_id):
:return: job instance if found in job list
None if not found in job list
"""
# 如果source_id不在jobs的键中返回None
if source_id not in jobs.keys():
return None
# 如果job_id不在jobs[source_id]的键中返回None
if job_id not in jobs[source_id].keys():
return None
# 返回jobs[source_id][job_id]
return jobs[source_id][job_id]
@ -526,9 +696,15 @@ def del_job(jobs, source_id, job_id):
:param job_id: target job's job_id
:return: bool True or False
"""
# 判断source_id是否在jobs字典中
if source_id not in jobs.keys():
# 如果不在返回False
return False
# 判断job_id是否在jobs[source_id]字典中
if job_id not in jobs[source_id].keys():
# 如果不在返回False
return False
# 删除jobs[source_id]字典中的job_id键值对
del jobs[source_id][job_id]
# 返回True
return True

Loading…
Cancel
Save