|
|
|
@ -13,7 +13,6 @@
|
|
|
|
|
# limitations under the License.
|
|
|
|
|
# ============================================================================
|
|
|
|
|
"""tbe adapter to adapt te/topi/auto-tune python api """
|
|
|
|
|
# 导入必要的库和模块
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import shutil
|
|
|
|
@ -21,62 +20,33 @@ import sys
|
|
|
|
|
import traceback
|
|
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
|
# 导入TBE相关的库和模块
|
|
|
|
|
from tbe.common.rl_bank.bank_manager import set_current_op_name
|
|
|
|
|
from tbe.common.repository_manager.interface import cann_kb_unload, cann_kb_load
|
|
|
|
|
from tbe.common.rl_bank.bank_cfg import LocalLock
|
|
|
|
|
from te.platform.cce_conf import te_set_version
|
|
|
|
|
from te.platform.cce_policy import set_L1_info
|
|
|
|
|
from te_fusion.compile_task_manager import (
|
|
|
|
|
dispatch_prebuild_task,
|
|
|
|
|
dispatch_single_op_compile_task,
|
|
|
|
|
import_py_module,
|
|
|
|
|
dispatch_fusion_op_compile_task,
|
|
|
|
|
dispatch_autotune_task,
|
|
|
|
|
sync_op_tune_params,
|
|
|
|
|
sync_syspath
|
|
|
|
|
)
|
|
|
|
|
from te_fusion.fusion_manager import (
|
|
|
|
|
call_op_func,
|
|
|
|
|
clear_fusion_params,
|
|
|
|
|
check_op_impl_mode,
|
|
|
|
|
save_op_params,
|
|
|
|
|
build_single_op_from_c,
|
|
|
|
|
op_params_to_json
|
|
|
|
|
)
|
|
|
|
|
from te_fusion.compile_task_manager import dispatch_prebuild_task, dispatch_single_op_compile_task, import_py_module, \
|
|
|
|
|
dispatch_fusion_op_compile_task, dispatch_autotune_task, sync_op_tune_params
|
|
|
|
|
from te_fusion.compile_task_manager import sync_syspath
|
|
|
|
|
from te_fusion.fusion_manager import call_op_func, clear_fusion_params, check_op_impl_mode, \
|
|
|
|
|
save_op_params, build_single_op_from_c, op_params_to_json
|
|
|
|
|
from te_fusion.fusion_util import dump_fusion_json
|
|
|
|
|
from te_fusion.parallel_compilation import (
|
|
|
|
|
init_multi_process_env,
|
|
|
|
|
start_ga_multi_process,
|
|
|
|
|
deinit_multi_process_env,
|
|
|
|
|
from te_fusion.parallel_compilation import init_multi_process_env, start_ga_multi_process, deinit_multi_process_env, \
|
|
|
|
|
get_finished_compilation_task
|
|
|
|
|
)
|
|
|
|
|
from .tbe_helper import (
|
|
|
|
|
get_soc_info,
|
|
|
|
|
assemble_op_args,
|
|
|
|
|
get_compute_op_list,
|
|
|
|
|
get_options_info,
|
|
|
|
|
get_fuzz_build_info,
|
|
|
|
|
adjust_custom_op_info,
|
|
|
|
|
pack_op_args,
|
|
|
|
|
get_module_name,
|
|
|
|
|
get_real_op_debug_level
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
from .tbe_helper import get_soc_info, assemble_op_args, get_compute_op_list, get_options_info, get_fuzz_build_info, \
|
|
|
|
|
adjust_custom_op_info, pack_op_args, get_module_name, get_real_op_debug_level
|
|
|
|
|
from .tbe_job import TbeJob, JobStatus
|
|
|
|
|
|
|
|
|
|
# 定义支持的平台标志
|
|
|
|
|
PLATFORM_FLAG = [
|
|
|
|
|
"Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403"
|
|
|
|
|
]
|
|
|
|
|
PLATFORM_FLAG = ["Ascend310", "Ascend910", "Hi3796CV300ES", "Ascend710", "Ascend610", "Hi3796CV300CS", "SD3403"]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 定义Tune初始化函数
|
|
|
|
|
def _tune_init(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Tune初始化
|
|
|
|
|
:param job: TbeJob对象,包含任务信息
|
|
|
|
|
:return: 初始化是否成功
|
|
|
|
|
Tune Initialize
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 提取Soc信息和Tune信息
|
|
|
|
|
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
|
|
|
|
|
offline_tune = job.content["SocInfo"]["offlineTune"]
|
|
|
|
|
op_bank_update = job.content["SocInfo"]["op_bank_update"]
|
|
|
|
@ -84,14 +54,11 @@ def _tune_init(job: TbeJob):
|
|
|
|
|
tune_bank_path = job.content["TuneInfo"]["tune_bank_path"]
|
|
|
|
|
need_ga = bool("GA" in auto_tiling_mode)
|
|
|
|
|
need_rl = bool("RL" in auto_tiling_mode)
|
|
|
|
|
|
|
|
|
|
# 设置环境变量
|
|
|
|
|
if offline_tune:
|
|
|
|
|
os.environ["ENABLE_TUNE_DUMP"] = "TRUE"
|
|
|
|
|
if op_bank_update:
|
|
|
|
|
sync_op_tune_params("tbe.common.tiling.tiling_api", "reset_repository", False, "")
|
|
|
|
|
|
|
|
|
|
# 初始化Tune环境
|
|
|
|
|
if need_ga or need_rl or offline_tune:
|
|
|
|
|
res = __init_tune_env(job, need_ga)
|
|
|
|
|
if not res:
|
|
|
|
@ -99,7 +66,6 @@ def _tune_init(job: TbeJob):
|
|
|
|
|
else:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# 设置Tune路径
|
|
|
|
|
if tune_dump_path:
|
|
|
|
|
os.environ["TUNE_DUMP_PATH"] = str(tune_dump_path)
|
|
|
|
|
if tune_bank_path:
|
|
|
|
@ -107,12 +73,12 @@ def _tune_init(job: TbeJob):
|
|
|
|
|
res = _creating_custom_path(job)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
# 定义CANN知识库加载函数
|
|
|
|
|
|
|
|
|
|
def _cann_kb_load(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
加载CANN知识库
|
|
|
|
|
:param job: TbeJob对象,包含任务信息
|
|
|
|
|
:return: 加载是否成功
|
|
|
|
|
database load
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
soc_version = job.soc_version
|
|
|
|
|
core_num = job.core_num
|
|
|
|
@ -121,12 +87,12 @@ def _cann_kb_load(job: TbeJob):
|
|
|
|
|
res = cann_kb_load(soc_version, core_num, op_bank_path, kb_type)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
# 定义CANN知识库卸载函数
|
|
|
|
|
|
|
|
|
|
def _cann_kb_unload(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
卸载CANN知识库
|
|
|
|
|
:param job: TbeJob对象,包含任务信息
|
|
|
|
|
:return: 卸载是否成功
|
|
|
|
|
database unload
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
if job is None:
|
|
|
|
|
return 0
|
|
|
|
@ -136,12 +102,12 @@ def _cann_kb_unload(job: TbeJob):
|
|
|
|
|
res = cann_kb_unload(soc_version, core_num, kb_type)
|
|
|
|
|
return res
|
|
|
|
|
|
|
|
|
|
# 定义移除缓存文件函数
|
|
|
|
|
|
|
|
|
|
def _remove_cache(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
移除缓存文件
|
|
|
|
|
:param job: TbeJob对象,包含任务信息
|
|
|
|
|
:return: 无
|
|
|
|
|
:param job: remove cache file:[*.json, *.o, *.info, *.cce] when "op_debug_level" is "0"
|
|
|
|
|
op_debug_level: representation the env MS_COMPILER_OP_LEVEL
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
op_debug_level = job.content["SocInfo"]["op_debug_level"]
|
|
|
|
|
op_debug_dir = job.content["SocInfo"]["op_debug_dir"]
|
|
|
|
@ -152,30 +118,24 @@ def _remove_cache(job: TbeJob):
|
|
|
|
|
real_path = os.path.join(root_path, "kernel_meta/")
|
|
|
|
|
shutil.rmtree(real_path)
|
|
|
|
|
|
|
|
|
|
# 定义创建目录函数
|
|
|
|
|
|
|
|
|
|
def __directory_creation(path, concat_path):
|
|
|
|
|
"""
|
|
|
|
|
创建目录
|
|
|
|
|
:param path: 基础路径
|
|
|
|
|
:param concat_path: 需要连接的路径
|
|
|
|
|
:return: 创建后的完整路径
|
|
|
|
|
Create directory
|
|
|
|
|
"""
|
|
|
|
|
path = os.path.join(path, concat_path)
|
|
|
|
|
if not os.path.isdir(path):
|
|
|
|
|
os.makedirs(path, 0o750)
|
|
|
|
|
return path
|
|
|
|
|
|
|
|
|
|
# 定义初始化Tune环境函数
|
|
|
|
|
|
|
|
|
|
def __init_tune_env(job, need_ga):
|
|
|
|
|
"""
|
|
|
|
|
初始化Tune环境
|
|
|
|
|
:param job: TbeJob对象,包含任务信息
|
|
|
|
|
:param need_ga: 是否需要GA
|
|
|
|
|
:return: 初始化是否成功
|
|
|
|
|
Initialize tune env
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
import auto_tune.auto_tune_main as at_atm
|
|
|
|
|
from schedule_search.rl_online_tune import rl_tune_init
|
|
|
|
|
from schedule_search.rl_online_tune import rl_tune_init # pylint: disable=unused-import
|
|
|
|
|
if need_ga:
|
|
|
|
|
res = at_atm.ga_tune_init()
|
|
|
|
|
if not res:
|
|
|
|
@ -197,13 +157,10 @@ def __init_tune_env(job, need_ga):
|
|
|
|
|
finally:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
# 定义创建默认自定义路径函数
|
|
|
|
|
|
|
|
|
|
def __creating_default_custom_path(auto_tiling_mode, base_custom_path):
|
|
|
|
|
"""
|
|
|
|
|
创建默认自定义路径
|
|
|
|
|
:param auto_tiling_mode: 自动平铺模式
|
|
|
|
|
:param base_custom_path: 基础自定义路径
|
|
|
|
|
:return: 无
|
|
|
|
|
Create default custom path
|
|
|
|
|
"""
|
|
|
|
|
base_custom_path = __directory_creation(base_custom_path, "data")
|
|
|
|
|
tune_flag = []
|
|
|
|
@ -222,40 +179,27 @@ def __creating_default_custom_path(auto_tiling_mode, base_custom_path):
|
|
|
|
|
|
|
|
|
|
def _creating_custom_path(job):
|
|
|
|
|
"""
|
|
|
|
|
创建自定义路径,用于存储和检索自定义算子的调优参数。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 自定义路径创建是否成功。
|
|
|
|
|
Create custom path
|
|
|
|
|
"""
|
|
|
|
|
# 获取自动平铺模式
|
|
|
|
|
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
|
|
|
|
|
# 如果模式中包含"NO_TUNE",则不需要创建自定义路径
|
|
|
|
|
if "NO_TUNE" in auto_tiling_mode:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# 获取调优参数的基础路径
|
|
|
|
|
base_custom_path = job.content["TuneInfo"]["tune_bank_path"]
|
|
|
|
|
tune_bank_flag = True
|
|
|
|
|
# 如果基础路径不存在,则尝试从auto_tune模块获取
|
|
|
|
|
if not base_custom_path:
|
|
|
|
|
import auto_tune
|
|
|
|
|
base_custom_path = os.path.dirname(os.path.realpath(auto_tune.__file__))
|
|
|
|
|
base_custom_path = os.path.realpath(os.path.join(base_custom_path, "../../../"))
|
|
|
|
|
tune_bank_flag = False
|
|
|
|
|
|
|
|
|
|
# 检查基础路径是否存在
|
|
|
|
|
if not os.path.isdir(base_custom_path):
|
|
|
|
|
job.error("Check whether the tuning path [{}] exists.".format(base_custom_path))
|
|
|
|
|
return False
|
|
|
|
|
# 检查基础路径的权限
|
|
|
|
|
if not os.access(base_custom_path, os.R_OK | os.W_OK | os.X_OK):
|
|
|
|
|
job.error("Check whether the permission on the tuning path [{}] is correct.".format(base_custom_path))
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# 如果不需要创建调优参数库,则直接返回成功
|
|
|
|
|
if not tune_bank_flag:
|
|
|
|
|
return __creating_default_custom_path(auto_tiling_mode, base_custom_path)
|
|
|
|
|
return True
|
|
|
|
@ -263,34 +207,22 @@ def _creating_custom_path(job):
|
|
|
|
|
|
|
|
|
|
def _parallel_compilation_init(initialize: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
初始化TBE并行编译环境。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
initialize (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 并行编译环境初始化是否成功。
|
|
|
|
|
Tbe parallel compilation initialize
|
|
|
|
|
:param initialize:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 设置并行编译器的环境变量
|
|
|
|
|
os.environ["TE_PARALLEL_COMPILER"] = str(initialize.content["process_num"])
|
|
|
|
|
# 获取SoC信息
|
|
|
|
|
soc_info = get_soc_info(initialize.content)
|
|
|
|
|
# 获取实际的调试级别
|
|
|
|
|
real_debug_level = get_real_op_debug_level(initialize.content)
|
|
|
|
|
# 获取自动平铺模式
|
|
|
|
|
auto_tiling_mode = initialize.content["SocInfo"]["autoTilingMode"]
|
|
|
|
|
# 获取是否需要离线调优
|
|
|
|
|
offline_tune = initialize.content["SocInfo"]["offlineTune"]
|
|
|
|
|
# 生成进程ID和时间戳的组合字符串
|
|
|
|
|
pid_ts = "{}_pid{}".format(datetime.now().strftime('%Y%m%d_%H%M%S%f')[:-3], os.getpid())
|
|
|
|
|
# 初始化多进程环境
|
|
|
|
|
ret = init_multi_process_env(False, soc_info, auto_tiling_mode, real_debug_level,
|
|
|
|
|
None, 1, pid_ts)
|
|
|
|
|
if ret is None:
|
|
|
|
|
initialize.error("Init multiprocess env failed")
|
|
|
|
|
return False
|
|
|
|
|
initialize.info("Init multiprocess env success with {} process".format(ret[0]))
|
|
|
|
|
# 如果需要RL或离线调优,则初始化RL环境
|
|
|
|
|
if "RL" in auto_tiling_mode or offline_tune:
|
|
|
|
|
res_queue = ret[1]
|
|
|
|
|
live_checker = ret[2]
|
|
|
|
@ -302,7 +234,6 @@ def _parallel_compilation_init(initialize: TbeJob):
|
|
|
|
|
initialize.error("RL env init failed!")
|
|
|
|
|
return False
|
|
|
|
|
initialize.info("RL Tune init success.")
|
|
|
|
|
# 如果需要GA,则启动GA多进程
|
|
|
|
|
if "GA" in auto_tiling_mode:
|
|
|
|
|
start_ga_multi_process(auto_tiling_mode)
|
|
|
|
|
initialize.info("GA Tune init success.")
|
|
|
|
@ -311,44 +242,31 @@ def _parallel_compilation_init(initialize: TbeJob):
|
|
|
|
|
|
|
|
|
|
def tbe_initialize(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
初始化TBE环境。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: TBE环境初始化是否成功。
|
|
|
|
|
Tbe Initialize
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 设置上下文模型编译环境变量
|
|
|
|
|
os.environ["CONTEXT_MODELCOMPILING"] = "TRUE"
|
|
|
|
|
# 获取SoC信息
|
|
|
|
|
soc_info = get_soc_info(job.content)
|
|
|
|
|
# 设置版本
|
|
|
|
|
res = te_set_version(*soc_info)
|
|
|
|
|
if not res:
|
|
|
|
|
job.error("Set version failed")
|
|
|
|
|
# 初始化调优环境
|
|
|
|
|
res = _tune_init(job)
|
|
|
|
|
if not res:
|
|
|
|
|
job.error("Tune init failed")
|
|
|
|
|
# 创建锁文件
|
|
|
|
|
lock_file = os.path.join(job.content["SocInfo"]["op_debug_dir"], "kernel_meta", "file.lock")
|
|
|
|
|
local_lock = LocalLock(lock_file)
|
|
|
|
|
try:
|
|
|
|
|
# 加锁
|
|
|
|
|
local_lock.lock()
|
|
|
|
|
# 加载CANN知识库
|
|
|
|
|
res = _cann_kb_load(job)
|
|
|
|
|
if res == 1:
|
|
|
|
|
job.error("Cann kb load failed")
|
|
|
|
|
# 初始化并行编译
|
|
|
|
|
res = _parallel_compilation_init(job)
|
|
|
|
|
if not res:
|
|
|
|
|
job.error("Parallel compilation failed")
|
|
|
|
|
except RuntimeError:
|
|
|
|
|
job.error("Initialize failed with RuntimeError")
|
|
|
|
|
finally:
|
|
|
|
|
# 解锁
|
|
|
|
|
local_lock.unlock()
|
|
|
|
|
job.result = "Success"
|
|
|
|
|
return res
|
|
|
|
@ -356,13 +274,9 @@ def tbe_initialize(job: TbeJob):
|
|
|
|
|
|
|
|
|
|
def get_auto_tune_support_op_list(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
获取支持自动调优的算子列表。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
list: 支持自动调优的算子列表。
|
|
|
|
|
Get GA tune supported op list
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
from auto_tune_main import enable_auto_tune_support
|
|
|
|
|
auto_tune_op_list = enable_auto_tune_support()
|
|
|
|
@ -372,14 +286,10 @@ def get_auto_tune_support_op_list(job: TbeJob):
|
|
|
|
|
|
|
|
|
|
def _normalize_module_name(module_name, py_module_path):
|
|
|
|
|
"""
|
|
|
|
|
规范化模块名称。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
module_name (str): 模块名称。
|
|
|
|
|
py_module_path (str): Python模块路径。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
|
Normalize module name
|
|
|
|
|
:param module_name:
|
|
|
|
|
:param py_module_path:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
if py_module_path not in sys.path:
|
|
|
|
|
sys.path.insert(0, py_module_path)
|
|
|
|
@ -388,13 +298,9 @@ def _normalize_module_name(module_name, py_module_path):
|
|
|
|
|
|
|
|
|
|
def check_support(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
检查算子是否受支持。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 算子是否受支持。
|
|
|
|
|
Check support
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
op_compute_info_list = get_compute_op_list(job.content)
|
|
|
|
|
if len(op_compute_info_list) != 1:
|
|
|
|
@ -435,37 +341,21 @@ def check_support(job: TbeJob):
|
|
|
|
|
def select_op_format(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Select op format
|
|
|
|
|
根据计算操作信息选择操作的格式。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 操作格式选择是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取计算操作列表
|
|
|
|
|
compute_op_info_list = get_compute_op_list(job.content)
|
|
|
|
|
# 检查计算操作数量是否为1
|
|
|
|
|
if len(compute_op_info_list) != 1:
|
|
|
|
|
job.error("Invalid op compute num ({}) in check_support".format(len(compute_op_info_list)))
|
|
|
|
|
return False
|
|
|
|
|
# 获取第一个计算操作信息
|
|
|
|
|
compute_op_info = compute_op_info_list[0]
|
|
|
|
|
# 调整自定义操作信息
|
|
|
|
|
adjust_custom_op_info(compute_op_info)
|
|
|
|
|
# 组装操作参数
|
|
|
|
|
inputs, outputs, attrs = assemble_op_args(compute_op_info)
|
|
|
|
|
# 获取操作模块名称
|
|
|
|
|
op_module_name = get_module_name(compute_op_info)
|
|
|
|
|
# 获取Python模块路径
|
|
|
|
|
py_module_path = compute_op_info["py_module_path"]
|
|
|
|
|
# 规范化模块名称
|
|
|
|
|
_normalize_module_name(op_module_name, py_module_path)
|
|
|
|
|
# 设置操作选择格式的函数名称
|
|
|
|
|
op_func_name = "op_select_format"
|
|
|
|
|
# 调用操作函数选择格式
|
|
|
|
|
res = call_op_func((inputs, outputs, attrs), op_module_name, op_func_name)
|
|
|
|
|
# 设置操作格式选择结果
|
|
|
|
|
job.result = str(res)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
@ -473,25 +363,15 @@ def select_op_format(job: TbeJob):
|
|
|
|
|
def parallel_pre_compile_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Parallel pre compile op
|
|
|
|
|
并行预编译操作。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 预编译操作是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取计算操作列表
|
|
|
|
|
compute_op_info_list = get_compute_op_list(job.content)
|
|
|
|
|
# 检查计算操作数量是否为1
|
|
|
|
|
if len(compute_op_info_list) != 1:
|
|
|
|
|
job.error("Invalid op compute num ({}) in pre compile op".format(len(compute_op_info_list)))
|
|
|
|
|
return False
|
|
|
|
|
# 获取第一个计算操作信息
|
|
|
|
|
compute_op_info = compute_op_info_list[0]
|
|
|
|
|
# 调整自定义操作信息
|
|
|
|
|
adjust_custom_op_info(compute_op_info)
|
|
|
|
|
# 预构建计算操作信息
|
|
|
|
|
_pre_build_compute_op_info(compute_op_info, job)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
@ -499,60 +379,35 @@ def parallel_pre_compile_op(job: TbeJob):
|
|
|
|
|
def _pre_build_compute_op_info(compute_op, job):
|
|
|
|
|
"""
|
|
|
|
|
Prebuild by compute op info
|
|
|
|
|
根据计算操作信息预构建操作。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
compute_op (dict): 计算操作信息。
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
|
:param compute_op:
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取L1缓存大小
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# 如果L1缓存大小不为-1,则设置L1缓存信息
|
|
|
|
|
if l1_size != -1:
|
|
|
|
|
set_L1_info("op_L1_space", -1)
|
|
|
|
|
# 组装操作参数
|
|
|
|
|
inputs, outputs, attrs = assemble_op_args(compute_op, is_single_op_build=True)
|
|
|
|
|
# 获取操作模块名称
|
|
|
|
|
op_module_name = get_module_name(compute_op)
|
|
|
|
|
# 获取Python模块路径
|
|
|
|
|
py_module_path = compute_op["py_module_path"]
|
|
|
|
|
# 获取操作函数名称
|
|
|
|
|
op_func_name = compute_op["func_name"]
|
|
|
|
|
# 获取操作类型
|
|
|
|
|
op_type = compute_op["type"]
|
|
|
|
|
# 获取操作名称
|
|
|
|
|
op_name = compute_op["op_name"]
|
|
|
|
|
# 保存操作参数
|
|
|
|
|
save_op_params(op_name, "prebuild", (outputs, attrs))
|
|
|
|
|
# 设置L1缓存信息
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
set_L1_info("op_L1_space", l1_size)
|
|
|
|
|
# 规范化模块名称
|
|
|
|
|
_normalize_module_name(op_module_name, py_module_path)
|
|
|
|
|
# 获取未知形状信息
|
|
|
|
|
unknown_shape = compute_op["unknown_shape"]
|
|
|
|
|
# 获取int64模式信息
|
|
|
|
|
int64_mode = compute_op["int64mode"]
|
|
|
|
|
# 检查操作实现模式
|
|
|
|
|
res = check_op_impl_mode(op_module_name, op_func_name)
|
|
|
|
|
# 获取操作实现模式
|
|
|
|
|
op_impl_mode = job.content["SocInfo"]["op_impl_mode"]
|
|
|
|
|
# 获取操作实现模式列表
|
|
|
|
|
op_impl_mode_list = job.content["SocInfo"]["op_impl_mode_list"]
|
|
|
|
|
# 获取完整操作名称
|
|
|
|
|
op_full_name = job.content["full_name"]
|
|
|
|
|
# 如果操作不支持实现模式,则发出警告
|
|
|
|
|
if not res:
|
|
|
|
|
if op_impl_mode_list:
|
|
|
|
|
job.warning("The op {} do NOT support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode))
|
|
|
|
|
else:
|
|
|
|
|
# 否则,记录操作支持实现模式的信息
|
|
|
|
|
job.info("OpType {} support op_impl_mode, current op_impl_mode:{}".format(op_type, op_impl_mode))
|
|
|
|
|
# 获取选项信息
|
|
|
|
|
options = get_options_info(job.content)
|
|
|
|
|
# 分派预构建任务
|
|
|
|
|
dispatch_prebuild_task(job.source_id, job.id, l1_size, op_module_name, op_full_name,
|
|
|
|
|
op_type, op_func_name, unknown_shape,
|
|
|
|
|
(inputs, outputs, attrs, options), int64_mode, unknown_shape,
|
|
|
|
@ -561,22 +416,13 @@ def _pre_build_compute_op_info(compute_op, job):
|
|
|
|
|
|
|
|
|
|
def get_prebuild_output(op_name):
|
|
|
|
|
"""
|
|
|
|
|
Get prebuild output
|
|
|
|
|
获取预构建输出。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
op_name (str): 操作名称。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
dict: 预构建输出。
|
|
|
|
|
get prebuild output
|
|
|
|
|
:param op_name:
|
|
|
|
|
"""
|
|
|
|
|
# 将操作参数转换为JSON字符串
|
|
|
|
|
params_str = op_params_to_json(op_name)
|
|
|
|
|
try:
|
|
|
|
|
# 尝试解析JSON字符串
|
|
|
|
|
res = json.loads(params_str)
|
|
|
|
|
except ValueError:
|
|
|
|
|
# 如果解析失败,则返回空字典
|
|
|
|
|
res = {}
|
|
|
|
|
finally:
|
|
|
|
|
pass
|
|
|
|
@ -586,15 +432,9 @@ def get_prebuild_output(op_name):
|
|
|
|
|
def do_fuzz_build_tbe_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Fuzzy build op
|
|
|
|
|
模糊构建操作。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 模糊构建操作是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 设置操作结果为"NOT_CHANGED"
|
|
|
|
|
job.result = "NOT_CHANGED"
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
@ -602,15 +442,9 @@ def do_fuzz_build_tbe_op(job: TbeJob):
|
|
|
|
|
def _dump_fusion_op_info_to_json_file(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Dump fusion op info to json file
|
|
|
|
|
将融合操作信息转储到JSON文件。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 如果系统参数调试路径不为空,则转储融合操作信息
|
|
|
|
|
if not job.sys_para_debug_path or job.sys_para_debug_path == "\0":
|
|
|
|
|
return
|
|
|
|
|
dump_fusion_json(json.dumps(job.content), job.sys_para_debug_path)
|
|
|
|
@ -619,55 +453,30 @@ def _dump_fusion_op_info_to_json_file(job: TbeJob):
|
|
|
|
|
def build_single_pre_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Build single op
|
|
|
|
|
构建单个操作的预处理过程。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 构建过程是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 执行构建前的处理工作
|
|
|
|
|
before_build_process(job)
|
|
|
|
|
# 获取计算操作列表
|
|
|
|
|
compute_op_info_list = get_compute_op_list(job.content)
|
|
|
|
|
# 确保只有一个计算操作
|
|
|
|
|
if len(compute_op_info_list) != 1:
|
|
|
|
|
job.error("Invalid op compute num ({}) in build single op".format(len(compute_op_info_list)))
|
|
|
|
|
return False
|
|
|
|
|
# 获取单个计算操作信息
|
|
|
|
|
compute_op_info = compute_op_info_list[0]
|
|
|
|
|
# 调整自定义操作信息
|
|
|
|
|
adjust_custom_op_info(compute_op_info)
|
|
|
|
|
# 组装操作的输入、输出和属性
|
|
|
|
|
inputs, outputs, attrs = assemble_op_args(compute_op_info, is_single_op_build=True)
|
|
|
|
|
# 获取操作类型
|
|
|
|
|
op_type = compute_op_info["type"]
|
|
|
|
|
# 获取L1缓存大小
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# 获取操作模块名称
|
|
|
|
|
op_module_name = get_module_name(compute_op_info)
|
|
|
|
|
# 获取操作内核名称
|
|
|
|
|
op_kernel_name = compute_op_info["op_name"]
|
|
|
|
|
# 获取Python模块路径
|
|
|
|
|
py_module_path = compute_op_info["py_module_path"]
|
|
|
|
|
# 获取完整操作名称
|
|
|
|
|
op_name = job.content["full_name"]
|
|
|
|
|
# 获取操作函数名称
|
|
|
|
|
op_func_name = compute_op_info["func_name"]
|
|
|
|
|
# 规范化模块名称
|
|
|
|
|
_normalize_module_name(op_module_name, py_module_path)
|
|
|
|
|
# 获取未知形状信息
|
|
|
|
|
unknown_shape = compute_op_info["unknown_shape"]
|
|
|
|
|
# 获取int64模式信息
|
|
|
|
|
int64_mode = compute_op_info["int64mode"]
|
|
|
|
|
# 获取操作模式
|
|
|
|
|
op_pattern = compute_op_info["pattern"]
|
|
|
|
|
# 获取选项信息
|
|
|
|
|
options = get_options_info(job.content)
|
|
|
|
|
# 获取模糊构建信息
|
|
|
|
|
fuzz_build_info = get_fuzz_build_info(job.content)
|
|
|
|
|
# 分派单个操作编译任务
|
|
|
|
|
dispatch_single_op_compile_task(job.source_id, job.id, l1_size, op_module_name, op_name, op_type, op_func_name,
|
|
|
|
|
op_kernel_name, unknown_shape, (inputs, outputs, attrs, options), int64_mode,
|
|
|
|
|
None, None, unknown_shape, op_pattern,
|
|
|
|
@ -678,22 +487,13 @@ def build_single_pre_op(job: TbeJob):
|
|
|
|
|
def before_build_process(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Processing before build
|
|
|
|
|
在构建前进行处理。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
None
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取L1缓存大小并设置
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
set_L1_info("op_L1_space", l1_size)
|
|
|
|
|
# 将融合操作信息转储到JSON文件
|
|
|
|
|
_dump_fusion_op_info_to_json_file(job)
|
|
|
|
|
# 获取是否需要离线调优
|
|
|
|
|
offline_tune = job.sys_offline_tune
|
|
|
|
|
# 如果需要离线调优,则将融合操作信息转储到JSON文件
|
|
|
|
|
if offline_tune:
|
|
|
|
|
dump_fusion_json(json.dumps(job.content), job.sys_tune_dump_path)
|
|
|
|
|
|
|
|
|
@ -701,29 +501,20 @@ def before_build_process(job: TbeJob):
|
|
|
|
|
def sync_fusion_env(fusion_need_sync, module_list):
|
|
|
|
|
"""
|
|
|
|
|
Sync fusion env
|
|
|
|
|
同步融合环境。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
fusion_need_sync (int): 是否需要同步融合环境。
|
|
|
|
|
module_list (dict): 模块列表。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 同步是否成功。
|
|
|
|
|
:param fusion_need_sync:
|
|
|
|
|
:param module_list:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 如果不需要同步,则直接返回成功
|
|
|
|
|
if fusion_need_sync == 0:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
# 准备使用的模块列表
|
|
|
|
|
module_using = []
|
|
|
|
|
for key, value in module_list.items():
|
|
|
|
|
if value > 0:
|
|
|
|
|
module_using.append(str(key))
|
|
|
|
|
module_list[key] = 0
|
|
|
|
|
|
|
|
|
|
# 将使用的模块列表转换为字符串
|
|
|
|
|
module_str = ",".join(module_using)
|
|
|
|
|
# 导入使用的模块
|
|
|
|
|
import_py_module(module_str)
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
@ -731,23 +522,13 @@ def sync_fusion_env(fusion_need_sync, module_list):
|
|
|
|
|
def parallel_compile_fusion_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Compile fusion op in parallel compiler
|
|
|
|
|
在并行编译器中编译融合操作。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 编译过程是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取L1缓存大小
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# 获取选项信息
|
|
|
|
|
options = get_options_info(job.content)
|
|
|
|
|
# 获取融合操作内核名称
|
|
|
|
|
op_kernel_name = job.content["fusion_op_name"]
|
|
|
|
|
# 获取完整操作名称
|
|
|
|
|
op_name = job.content["full_name"]
|
|
|
|
|
# 分派融合操作编译任务
|
|
|
|
|
dispatch_fusion_op_compile_task(job.source_id, job.id, l1_size, json.dumps(job.content), op_kernel_name, None, None,
|
|
|
|
|
options, None, job.pass_list, op_name)
|
|
|
|
|
return True
|
|
|
|
@ -756,185 +537,112 @@ def parallel_compile_fusion_op(job: TbeJob):
|
|
|
|
|
def ga_tune(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
GA tune
|
|
|
|
|
使用遗传算法进行调优。
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): 包含任务信息的TbeJob对象。
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: 调优过程是否成功。
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# 获取L1缓存大小
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# 获取融合操作内核名称
|
|
|
|
|
op_kernel_name = job.content["fusion_op_name"]
|
|
|
|
|
# 获取完整操作名称
|
|
|
|
|
op_name = job.content["full_name"]
|
|
|
|
|
# 分派自动调优任务
|
|
|
|
|
dispatch_autotune_task(job.source_id, job.id, l1_size, json.dumps(job.content), {}, op_kernel_name, op_name)
|
|
|
|
|
# 设置任务状态为运行中
|
|
|
|
|
job.status = JobStatus.JOB_RUNNING
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rl_tune_single_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Perform RL (Reinforcement Learning) tuning for a single operation.
|
|
|
|
|
|
|
|
|
|
This function is responsible for tuning a single operation using RL techniques.
|
|
|
|
|
It retrieves the operation's information, performs the tuning, and handles any exceptions that may occur during the process.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): An object containing job information, including the operation to be tuned.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True if the RL tuning is successful, False otherwise.
|
|
|
|
|
RL tune single op
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# Retrieve the list of compute operations from the job content
|
|
|
|
|
compute_op_info_list = get_compute_op_list(job.content)
|
|
|
|
|
# Check if there is exactly one compute operation
|
|
|
|
|
if len(compute_op_info_list) != 1:
|
|
|
|
|
job.error("Invalid op compute num ({}) in rl tune single op".format(len(compute_op_info_list)))
|
|
|
|
|
return False
|
|
|
|
|
# Get the first (and only) compute operation info
|
|
|
|
|
compute_op_info = compute_op_info_list[0]
|
|
|
|
|
# Assemble the operation's input, output, and attributes
|
|
|
|
|
inputs, outputs, attrs = assemble_op_args(compute_op_info)
|
|
|
|
|
# Get the operation type
|
|
|
|
|
op_type = compute_op_info["type"]
|
|
|
|
|
# Get the L1 size from the job content
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# Get the operation module name
|
|
|
|
|
op_module_name = get_module_name(compute_op_info)
|
|
|
|
|
# Get the operation kernel name
|
|
|
|
|
op_kernel_name = compute_op_info["op_name"]
|
|
|
|
|
# Get the full name of the operation
|
|
|
|
|
full_name = compute_op_info["name"]
|
|
|
|
|
# Get the Python module path
|
|
|
|
|
py_module_path = compute_op_info["py_module_path"]
|
|
|
|
|
# Get the operation function name
|
|
|
|
|
op_func_name = compute_op_info["func_name"]
|
|
|
|
|
# Normalize the module name
|
|
|
|
|
_normalize_module_name(op_module_name, py_module_path)
|
|
|
|
|
# Set the current operation name
|
|
|
|
|
set_current_op_name(op_kernel_name)
|
|
|
|
|
# Get the unknown shape information
|
|
|
|
|
unknown_shape = compute_op_info["unknown_shape"]
|
|
|
|
|
# Get the int64 mode information
|
|
|
|
|
int64_mode = compute_op_info["int64mode"]
|
|
|
|
|
# Get the operation pattern
|
|
|
|
|
op_pattern = compute_op_info["pattern"]
|
|
|
|
|
# Get the fuzz build information
|
|
|
|
|
fuzz_build_info = get_fuzz_build_info(job.content)
|
|
|
|
|
# Get the auto tiling mode
|
|
|
|
|
auto_tiling_mode = job.content["SocInfo"]["autoTilingMode"]
|
|
|
|
|
# Get the device ID
|
|
|
|
|
device_id = job.content["SocInfo"]["deviceId"]
|
|
|
|
|
# Get the options information
|
|
|
|
|
options = get_options_info(job.content)
|
|
|
|
|
try:
|
|
|
|
|
# Build the single operation from C code
|
|
|
|
|
build_single_op_from_c(op_module_name, op_func_name, op_type, "build", unknown_shape,
|
|
|
|
|
(inputs, outputs, attrs), int64_mode, unknown_shape, options,
|
|
|
|
|
op_pattern, auto_tiling_mode, device_id, json.dumps(fuzz_build_info))
|
|
|
|
|
# pylint: disable=broad-except
|
|
|
|
|
except Exception:
|
|
|
|
|
# If an exception occurs, log the error and return False
|
|
|
|
|
job.error(
|
|
|
|
|
"Single op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string))
|
|
|
|
|
exc_type, exc_value, _ = sys.exc_info()
|
|
|
|
|
job.error(
|
|
|
|
|
"exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc()))
|
|
|
|
|
return False
|
|
|
|
|
# Prepare the tuning operation module name
|
|
|
|
|
finally:
|
|
|
|
|
pass
|
|
|
|
|
tune_op_module_name = op_module_name + "@" + py_module_path
|
|
|
|
|
# Get the base kernel path
|
|
|
|
|
base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o"
|
|
|
|
|
# Dispatch the single tune task
|
|
|
|
|
from schedule_search.rl_online_tune import dispatch_single_tune_task
|
|
|
|
|
pack_args = pack_op_args(inputs, outputs, attrs)
|
|
|
|
|
res = dispatch_single_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, full_name,
|
|
|
|
|
tune_op_module_name, op_func_name, op_type, pack_args)
|
|
|
|
|
# Process the RL tune result
|
|
|
|
|
return _process_rl_tune_result(job, op_type, res)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rl_tune_fusion_op(job: TbeJob):
|
|
|
|
|
"""
|
|
|
|
|
Perform RL tuning for a fusion operation.
|
|
|
|
|
|
|
|
|
|
This function is responsible for tuning a fusion operation using RL techniques.
|
|
|
|
|
It compiles the operation using multiprocessing and handles any exceptions that may occur during the process.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): An object containing job information, including the fusion operation to be tuned.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: True if the RL tuning is successful, False otherwise.
|
|
|
|
|
rl tune fusion op
|
|
|
|
|
:param job:
|
|
|
|
|
:return:
|
|
|
|
|
"""
|
|
|
|
|
# Get the fusion operation kernel name
|
|
|
|
|
op_kernel_name = job.content["fusion_op_name"]
|
|
|
|
|
# Set the current operation name
|
|
|
|
|
set_current_op_name(op_kernel_name)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
# Compile the operation using multiprocessing
|
|
|
|
|
from schedule_search.rl_online_tune import compile_op_by_mp
|
|
|
|
|
compile_op_by_mp(json.dumps(job.content))
|
|
|
|
|
# pylint: disable=broad-except
|
|
|
|
|
except Exception:
|
|
|
|
|
# If an exception occurs, log the error and return False
|
|
|
|
|
job.error(
|
|
|
|
|
"Fusion op {} build failed, no need to do rl tune, json string:{}".format(op_kernel_name, job.json_string))
|
|
|
|
|
exc_type, exc_value, _ = sys.exc_info()
|
|
|
|
|
job.error(
|
|
|
|
|
"exc_type:{}, exc_value:{}, exc_traceback:{}".format(exc_type, exc_value, traceback.format_exc()))
|
|
|
|
|
return False
|
|
|
|
|
# Get the L1 size
|
|
|
|
|
finally:
|
|
|
|
|
pass
|
|
|
|
|
l1_size = job.content["l1_size"]
|
|
|
|
|
# Get the base kernel path
|
|
|
|
|
base_kernel = job.content["SocInfo"]["op_debug_dir"] + "/kernel_meta/" + op_kernel_name + ".o"
|
|
|
|
|
# Get the list of compute operations
|
|
|
|
|
compute_op_list = get_compute_op_list(job.content)
|
|
|
|
|
# Prepare the operation module names string
|
|
|
|
|
op_module_names_str = ""
|
|
|
|
|
op_type_set = set()
|
|
|
|
|
for op in compute_op_list:
|
|
|
|
|
op_module_names_str = ','.join([op_module_names_str, get_module_name(op)])
|
|
|
|
|
op_type_set.add(op["type"])
|
|
|
|
|
# Remove the leading comma from the operation module names string
|
|
|
|
|
op_module_names_str = op_module_names_str[1:]
|
|
|
|
|
# Join the operation types with double underscore
|
|
|
|
|
op_type = "__".join(list(op_type_set))
|
|
|
|
|
# Dispatch the fusion tune task
|
|
|
|
|
from schedule_search.rl_online_tune import dispatch_fusion_tune_task
|
|
|
|
|
res = dispatch_fusion_tune_task(job.source_id, job.id, l1_size, base_kernel, op_kernel_name, op_module_names_str,
|
|
|
|
|
json.dumps(job.content))
|
|
|
|
|
# Process the RL tune result
|
|
|
|
|
return _process_rl_tune_result(job, op_type, res)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _process_rl_tune_result(job, op_type, res):
|
|
|
|
|
"""
|
|
|
|
|
Process the result of RL tuning.
|
|
|
|
|
|
|
|
|
|
If the tuning result is False, it checks if the operation type is in the black list or if the job is set to offline tune.
|
|
|
|
|
If the tuning result is True, it sets the job status to running.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
job (TbeJob): An object containing job information.
|
|
|
|
|
op_type (str): The type of the operation.
|
|
|
|
|
res (bool): The result of RL tuning.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
bool: The processed result of RL tuning.
|
|
|
|
|
"""
|
|
|
|
|
if not res:
|
|
|
|
|
# Check if the operation type is in the black list or if the job is set to offline tune
|
|
|
|
|
from schedule_search.tune_util import filter_black_op_type
|
|
|
|
|
res = bool(job.sys_offline_tune or os.getenv("REPEAT_TUNE", "False").lower() != "true" or filter_black_op_type(
|
|
|
|
|
op_type))
|
|
|
|
|
else:
|
|
|
|
|
# Set the job status to running
|
|
|
|
|
job.status = JobStatus.JOB_RUNNING
|
|
|
|
|
res = True
|
|
|
|
|
return res
|
|
|
|
@ -942,13 +650,8 @@ def _process_rl_tune_result(job, op_type, res):
|
|
|
|
|
|
|
|
|
|
def get_finish_tasks(source_id):
|
|
|
|
|
"""
|
|
|
|
|
Get the list of finished tasks from the parallel compilation framework.
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
source_id (int): The source ID of the tasks.
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
list: A list of finished task information.
|
|
|
|
|
Get finish task from parallel compilation framework
|
|
|
|
|
:return task info list
|
|
|
|
|
"""
|
|
|
|
|
return get_finished_compilation_task(source_id)
|
|
|
|
|
|
|
|
|
@ -961,21 +664,14 @@ def tbe_finalize(auto_tiling_mode, offline_tune, job: TbeJob):
|
|
|
|
|
:param job: TbeJob
|
|
|
|
|
:return: None
|
|
|
|
|
"""
|
|
|
|
|
# 释放多进程环境
|
|
|
|
|
deinit_multi_process_env()
|
|
|
|
|
# 如果自动切分模式为RL或者离线调优,则释放RL调优
|
|
|
|
|
if "RL" in auto_tiling_mode or offline_tune:
|
|
|
|
|
from schedule_search.rl_online_tune import rl_tune_deinit
|
|
|
|
|
rl_tune_deinit()
|
|
|
|
|
# 卸载Cann kb
|
|
|
|
|
res = _cann_kb_unload(job)
|
|
|
|
|
# 如果卸载失败,则返回False
|
|
|
|
|
if res == 1:
|
|
|
|
|
job.error("Cann kb unload failed")
|
|
|
|
|
return False
|
|
|
|
|
# 清除融合参数
|
|
|
|
|
clear_fusion_params()
|
|
|
|
|
# 删除缓存
|
|
|
|
|
_remove_cache(job)
|
|
|
|
|
# 返回True
|
|
|
|
|
return True
|
|
|
|
|