branch-lwh
liuwenhao 2 months ago
commit ae1d86521d

@ -1,2 +0,0 @@
# mindspore_group_2

@ -20,8 +20,11 @@ function(find_submodule_lib module name path)
)
endfunction()
# protobuf
function(ge_protobuf_generate c_var h_var)
# common_protobuf_generateprotobuf
common_protobuf_generate(${CMAKE_BINARY_DIR}/proto/ge/proto ${c_var} ${h_var} ${ARGN})
# chc_varh_var
set(${c_var} ${${c_var}} PARENT_SCOPE)
set(${h_var} ${${h_var}} PARENT_SCOPE)
endfunction()

@ -452,11 +452,14 @@ class _GeneratorWorkerMp(multiprocessing.Process):
"""
def __init__(self, dataset, eof, max_rowsize, queue_size, ppid):
# 初始化一个多进程队列,用于存储索引
self.idx_queue = multiprocessing.Queue(queue_size)
# 如果启用了共享内存,则初始化一个共享队列,否则初始化一个多进程队列
if get_enable_shared_mem():
self.res_queue = _SharedQueue(queue_size, max_rowsize=max_rowsize)
else:
self.res_queue = multiprocessing.Queue(queue_size)
# 设置队列的_joincancelled属性为True表示在进程退出时队列不会阻塞
self.idx_queue._joincancelled = True # pylint: disable=W0212
self.res_queue._joincancelled = True # pylint: disable=W0212
super().__init__(target=_generator_worker_loop, args=(dataset, self.idx_queue, self.res_queue, eof, True, ppid))
@ -465,6 +468,7 @@ class _GeneratorWorkerMp(multiprocessing.Process):
"""
Put function for worker index queue. Never block. Raise queue.Full on failure.
"""
# 将item放入idx_queue队列中不阻塞如果失败则抛出queue.Full异常
self.idx_queue.put_nowait(item)
def get(self):
@ -476,12 +480,19 @@ class _GeneratorWorkerMp(multiprocessing.Process):
return self.res_queue.get(timeout=30)
def queue_empty(self):
# 检查idx_queue是否为空
if not self.idx_queue.empty():
# 如果不为空,记录警告日志
logger.warning("idx_queue is not empty.")
# 返回False
return False
# 检查res_queue是否为空
if not self.res_queue.empty():
# 如果不为空,记录警告日志
logger.warning("res_queue is not empty.")
# 返回False
return False
# 如果两个队列都为空返回True
return True
def __del__(self):
@ -632,14 +643,17 @@ class GeneratorDataset(MappableDataset, UnionBaseDataset):
def __init__(self, source, column_names=None, column_types=None, schema=None, num_samples=None,
num_parallel_workers=1, shuffle=None, sampler=None, num_shards=None, shard_id=None,
python_multiprocessing=True, max_rowsize=6):
# 调用父类的初始化方法
super().__init__(num_parallel_workers=num_parallel_workers, sampler=sampler, num_samples=num_samples,
shuffle=shuffle, num_shards=num_shards, shard_id=shard_id)
# 如果source是zip类型则将其转换为列表
if isinstance(source, builtins.zip):
# Although zip is iteratable, it does not have the feature of repeated iteration, so pass it to the array.
self.source = [item for item in source]
else:
self.source = source
self.prepared_source = None # source to be sent to C++
# 如果self.operator_mixed属性为True则将num_parallel_workers设置为1
if hasattr(self, 'operator_mixed') and getattr(self, 'operator_mixed') is True:
self.num_parallel_workers = 1
logger.warning(
@ -650,56 +664,78 @@ class GeneratorDataset(MappableDataset, UnionBaseDataset):
self.python_multiprocessing = python_multiprocessing
# 将column_names转换为列表
self.column_names = to_list(column_names)
# 如果column_types不为空则将其转换为detypelist类型
if column_types is not None:
self.column_types = mstypelist_to_detypelist(column_types)
else:
self.column_types = []
self.schema = schema
# 如果schema不为空则将其转换为Schema类型
if schema is not None:
# 如果schema不为空则将其赋值给self.schema
self.schema = schema
# 如果schema不是Schema类型则将其转换为Schema类型
if not isinstance(schema, Schema):
self.schema = Schema(schema)
# Move get dataset_size by len from parse to here, because self.source will
# lose attribution of '__len__' after deepcopy.
self.source_len = -1 # unknown
# 如果self.source有__len__属性则获取self.source的长度
if hasattr(self.source, "__len__"):
self.source_len = len(self.source)
# 设置最大行大小
self.max_rowsize = max_rowsize
# 设置采样函数为None
self.sample_fn = None
def __deepcopy__(self, memodict):
# 深度复制当前对象并传入一个字典memodict用于存储已经复制的对象
if id(self) in memodict:
# 如果当前对象的id已经在memodict中则直接返回该对象
return memodict[id(self)]
# 否则调用__safe_deepcopy__方法进行深度复制并传入memodict和exclude参数
new_op = self.__safe_deepcopy__(memodict, exclude=("source", "__transfer_dataset__"))
sample_fn = None
# 如果新对象的sampler属性不为空并且self.source对象具有__getitem__方法
if new_op.sampler is not None and hasattr(self.source, "__getitem__"):
# The reason why there is a try catch here is because when the new op is being constructed with shared
# memory enabled, there will be an exception thrown if there is not enough shared memory available
# 如果self.source_len为-1则抛出RuntimeError异常因为尝试构造一个随机访问的数据集需要__len__方法
if self.source_len == -1:
raise RuntimeError("Attempt to construct a random access dataset, '__len__' method is required!")
try:
# 如果新对象的num_parallel_workers大于1则调用__validate_memory_usage方法进行内存使用验证
if new_op.num_parallel_workers > 1:
self.__validate_memory_usage()
# 创建一个SamplerFn对象用于并行采样
sample_fn = SamplerFn(self.source, new_op.num_parallel_workers, self.python_multiprocessing,
self.max_rowsize)
# 将新对象的prepared_source属性设置为_cpp_sampler_fn_mp函数用于并行采样
new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn_mp(sample_ids, sample_fn))
else:
# 否则将新对象的prepared_source属性设置为_cpp_sampler_fn函数用于单线程采样
new_op.prepared_source = (lambda sample_ids: _cpp_sampler_fn(sample_ids, self.source))
# 将新对象的sample_fn属性设置为sample_fn
new_op.sample_fn = sample_fn
except RuntimeError as e:
# 如果抛出RuntimeError异常则抛出Exception异常并传入异常信息
raise Exception(str(e))
else:
try:
# 否则将新对象的sampler属性设置为Nonesample_fn属性设置为sample_fn
new_op.sampler = None
new_op.sample_fn = sample_fn
# 将新对象的source_len属性设置为min(new_op.source_len, new_op.num_samples)如果new_op.num_samples不为0否则设置为new_op.source_len
new_op.source_len = min(new_op.source_len,
new_op.num_samples) if new_op.num_samples != 0 else new_op.source_len
# 遍历self.source对象
iter(self.source)
except TypeError:
# Use generator function if input callable
@ -711,19 +747,26 @@ class GeneratorDataset(MappableDataset, UnionBaseDataset):
return new_op
# 判断是否被洗牌
def is_shuffled(self):
return self.sampler.is_shuffled()
# 判断是否被分片
def is_sharded(self):
return self.sampler.is_sharded()
# 解析
def parse(self, children=None):
# 如果schema为空则返回GeneratorNode对象
if self.schema is None:
return cde.GeneratorNode(self.prepared_source, self.column_names, self.column_types, self.source_len,
self.sampler, self.num_parallel_workers)
# 获取schema
schema = self.schema
# 如果schema是Schema类型则获取cpp_schema
if isinstance(schema, Schema):
schema = self.schema.cpp_schema
# 返回GeneratorNode对象
return cde.GeneratorNode(self.prepared_source, schema, self.source_len, self.sampler,
self.num_parallel_workers)
@ -735,24 +778,37 @@ class GeneratorDataset(MappableDataset, UnionBaseDataset):
# if use num_parallel_workers is to large when python_multiprocessing=True which would cause
# OOM error get the num_shards
valid_num_shards = 1
# 判断self.sampler是否为samplers.DistributedSampler类型
if isinstance(self.sampler, samplers.DistributedSampler):
# 如果是则将self.sampler的num_shards赋值给valid_num_shards
valid_num_shards = self.sampler.num_shards
# 否则判断self.num_shards是否为None
elif self.num_shards is not None:
# 如果不是则将self.num_shards赋值给valid_num_shards
valid_num_shards = self.num_shards
# get process memory usage
# 获取当前进程
process = psutil.Process(os.getpid())
# 获取当前进程的内存信息
process_memory = process.memory_info().rss
# 获取系统内存的空闲量
sys_memory_free = psutil.virtual_memory().free
# 计算可能使用的总内存量
total_memory_maybe_used = process_memory * self.num_parallel_workers * valid_num_shards
# 如果总内存可能使用的内存量除以系统可用内存大于0.85
if total_memory_maybe_used / sys_memory_free > 0.85:
# 计算有效的worker数量即系统可用内存乘以0.85除以有效的shards数量再除以每个进程的内存
valid_num_worker = math.floor(sys_memory_free * 0.85 / valid_num_shards / process_memory)
# 如果有效的worker数量小于等于0则将其设置为1
valid_num_worker = 1 if valid_num_worker <= 0 else valid_num_worker
# 构造警告信息提示用户num_parallel_workers设置过大可能会导致内存占用过高或OOM建议将其减小到valid_num_worker或更小
info = "GeneratorDataset's num_parallel_workers: {} is too large which may cause a lot of memory " \
"occupation (>85%) or out of memory(OOM) during multiprocessing. Therefore, it is recommended " \
"to reduce num_parallel_workers to {} or smaller.".format(self.num_parallel_workers,
valid_num_worker)
# 打印警告信息
logger.warning(info)
@ -764,37 +820,55 @@ class _NumpySlicesDataset:
def __init__(self, data, column_list=None):
self.column_list = None
# Convert dict data into tuple
# 判断data是否为字典类型
if isinstance(data, dict):
# 如果是字典类型则调用process_dict方法处理
data = self.process_dict(data)
# 判断data是否为元组类型
if isinstance(data, tuple):
# 如果是元组类型则将self.data初始化为空元组
self.data = ()
# 获取data的长度
data_len = len(data)
# 遍历data中的每个元素
for i in range(data_len):
# 将data中的每个元素转换为numpy数组并添加到self.data中
self.data = self.data + (np.array(data[i]),)
else:
# 如果data不是元组类型则将data转换为numpy数组并添加到self.data中
self.data = (np.array(data),)
# check whether the data length in each column is equal
# 获取每个data_item的长度
data_len = [len(data_item) for data_item in self.data]
# 如果每个data_item的长度不相等则抛出ValueError异常
if data_len[1:] != data_len[:-1]:
raise ValueError("Data length in each column is not equal.")
# Init column_name
# 如果column_list不为空则将self.column_list赋值为column_list
if column_list is not None:
self.column_list = column_list
# 如果self.column_list为空则将self.column_list赋值为空列表
elif self.column_list is None:
self.column_list = []
# 获取data的列数
column_num = len(self.data)
# 遍历列数,将"column_" + str(i)添加到self.column_list中
for i in range(column_num):
self.column_list.append("column_" + str(i))
def __getitem__(self, index):
# 获取指定索引的数据行
data_row = [d[index, ...] for d in self.data]
# 将数据行转换为元组
data_res = tuple(data_row)
# 返回数据行
return data_res
def __len__(self):
# 返回data的第一个元素的长度
return len(self.data[0])
def process_dict(self, input_data):
@ -802,24 +876,29 @@ class _NumpySlicesDataset:
Convert the dict like data into tuple format, when input is a tuple of dicts then compose it into a dict first.
"""
# Convert pandas like dict(has "values" column) into General dict
# 将pandas样式的字典有"values"列)转换为通用字典
data_keys = list(input_data.keys())
# 获取字典的第一个键对应的值
data_col = input_data[data_keys[0]]
# 如果值有values属性则将其转换为通用字典
if hasattr(data_col, "values"):
new_dict = {}
for key in data_keys:
# 将字典中的键对应的值转换为列表
item1 = input_data.pop(key)
new_dict[key] = item1.values
# 将转换后的字典赋值给input_data
input_data = new_dict
# Convert the data in dict into tuple
data = ()
keys = list(input_data.keys())
self.column_list = keys
for key in keys:
value = input_data[key]
data = data + (list(value),)
data = () # 初始化一个空元组
keys = list(input_data.keys()) # 将输入数据的键转换为列表
self.column_list = keys # 将键列表赋值给实例变量column_list
for key in keys: # 遍历键列表
value = input_data[key] # 获取键对应的值
data = data + (list(value),) # 将值转换为列表,并添加到元组中
return data
return data # 返回元组
class NumpySlicesDataset(GeneratorDataset):
@ -909,7 +988,9 @@ class NumpySlicesDataset(GeneratorDataset):
@check_numpyslicesdataset
def __init__(self, data, column_names=None, num_samples=None, num_parallel_workers=1, shuffle=None, sampler=None,
num_shards=None, shard_id=None):
# 创建一个_NumpySlicesDataset对象传入data和column_names参数
dataset = _NumpySlicesDataset(data, column_names)
# 调用父类的__init__方法传入dataset、column_names、num_samples、num_parallel_workers、shuffle、sampler、num_shards和shard_id参数
super().__init__(dataset, column_names=dataset.column_list, num_samples=num_samples,
num_parallel_workers=num_parallel_workers, shuffle=shuffle, sampler=sampler,
num_shards=num_shards, shard_id=shard_id)

@ -1,47 +1,117 @@
# Copyright 2020-2022 Huawei Technologies Co., Ltd
#
# 代码版权声明说明此代码由华为技术有限公司在2020-2022年间开发
# Licensed under the Apache License, Version 2.0 (the "License");
# 说明此代码使用Apache License 2.0版本的许可证
# you may not use this file except in compliance with the License.
# 说明除非遵守许可证,否则不得使用此文件
# You may obtain a copy of the License at
#
# 提供许可证的获取地址
# http://www.apache.org/licenses/LICENSE-2.0
#
# 许可证的具体地址
# Unless required by applicable law or agreed to in writing, software
# 除非适用法律要求或书面同意
# distributed under the License is distributed on an "AS IS" BASIS,
# 许可证在“现状”基础上进行分发
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 不附带任何形式的明示或暗示的担保或条件
# See the License for the specific language governing permissions and
# 请参阅许可证了解特定的权限和
# limitations under the License.
# 限制条件
# ============================================================================
# 标准分割线,通常用于分隔许可证部分与代码部分
"""cell"""
# 文档字符串模块的名称为cell
import gc
# 导入垃圾回收模块,用于管理内存
import inspect
# 导入inspect模块用于获取活对象的信息
import os
# 导入os模块用于与操作系统进行交互
import time
# 导入time模块用于处理时间相关操作
from collections import OrderedDict
# 从collections模块导入OrderedDict类用于创建有序字典
from types import FunctionType, MethodType
# 从types模块导入FunctionType和MethodType类用于类型检查
import numpy
# 导入numpy模块用于科学计算
from mindspore._checkparam import args_type_check
# 从mindspore._checkparam模块导入args_type_check函数用于检查函数参数的类型
from mindspore import log as logger
# 从mindspore模块导入log模块并命名为logger用于日志记录
from mindspore.common.parameter import PARAMETER_NAME_DEFAULT
# 从mindspore.common.parameter模块导入PARAMETER_NAME_DEFAULT常量用于默认参数名称
from mindspore.common.hook_handle import HookHandle
# 从mindspore.common.hook_handle模块导入HookHandle类用于管理钩子处理
from mindspore.context import ParallelMode
# 从mindspore.context模块导入ParallelMode类用于并行模式配置
from mindspore.ops.composite import Shard
# 从mindspore.ops.composite模块导入Shard类用于分片操作
from .. import context
# 导入相对路径的context模块用于上下文配置
from .._c_expression import init_pipeline, update_func_graph_hyper_params, Cell_, FuncGraph, MixedPrecisionType
# 从相对路径的_c_expression模块导入多个函数和类用于初始化管道、更新函数图超参数、Cell的基础类、函数图类、混合精度类型
from .._checkparam import Validator
# 从相对路径的_checkparam模块导入Validator类用于参数验证
from ..common import dtype as mstype
# 从相对路径的common模块导入dtype模块并重命名为mstype用于数据类型定义
from ..common.api import _cell_graph_executor, _pynative_executor, _check_all_tensor, cells_compile_cache
# 从相对路径的common.api模块导入多个函数和类用于单元图执行器、原生模式执行器、检查所有张量、编译缓存
from ..common.parameter import Parameter, ParameterTuple
# 从相对路径的common.parameter模块导入Parameter类和ParameterTuple类用于参数和参数元组
from ..common.variable import Variable
# 从相对路径的common.variable模块导入Variable类用于变量表示
from ..common.tensor import Tensor, CSRTensor, COOTensor
# 从相对路径的common.tensor模块导入Tensor类、CSRTensor类和COOTensor类用于张量表示
from ..ops.operations import Cast
# 从相对路径的ops.operations模块导入Cast类用于类型转换操作
from ..ops.primitive import Primitive
# 从相对路径的ops.primitive模块导入Primitive类用于基础操作
from ..ops.operations import _inner_ops as inner
from ..parallel._tensor import _load_tensor_by_layout
# 从相对路径的ops.operations模块导入_inner_ops并重命名为inner用于内部操作
from ..parallel._tensor import _load_tensor_by_layout
# 从相对路径的parallel._tensor模块导入_load_tensor_by_layout函数用于按布局加载张量
class Cell(Cell_):
# 定义Cell类继承自Cell_类这是MindSpore中神经网络的基本构建单元
"""
The basic building block of neural networks in MindSpore. The model or neural network layer should inherit this
base class.
@ -81,6 +151,8 @@ class Cell(Cell_):
... # the parameter's name will be 'net.weight'.
[Parameter (name=weight, shape=(240, 120, 4, 4), dtype=Float32, requires_grad=True)]
"""
# 类文档字符串解释Cell类的作用、继承关系、参数、支持平台及示例
class _CellGuard:
"""Detecting whether the cell is a top-level cell with the 'with statement'."""

@ -22,61 +22,68 @@ from ...common.api import ms_function
class _FirstGrad(Cell):
# 计算第一个梯度的类
def __init__(self, fn):
super(_FirstGrad, self).__init__()
self.first_grad_op = C.GradOperation(sens_param=True, get_all=True)
self.fn = fn
def construct(self, u, first_grad_input):
# 构造方法,用于计算梯度
return self.first_grad_op(self.fn)(*first_grad_input, u)
class _JvpFirstGrad(Cell):
# 计算Jacobian-Vector-Product的第一个梯度的类
def __init__(self):
super(_JvpFirstGrad, self).__init__()
self.first_grad_op = C.GradOperation(sens_param=True, get_all=True)
def construct(self, u, fn, first_grad_input):
# 构造方法用于计算JVP的第一个梯度
return self.first_grad_op(fn)(*first_grad_input, u)
class _FirstGradSingleValue(Cell):
# 计算单值梯度的类
def __init__(self, fn):
super(_FirstGradSingleValue, self).__init__()
self.first_grad_single_value_op = C.GradOperation(sens_param=True)
self.fn = fn
def construct(self, u, first_grad_single_value_input):
# 构造方法,用于计算单值梯度
return self.first_grad_single_value_op(self.fn)(*first_grad_single_value_input, u)
class _JvpFirstGradSingleValue(Cell):
# 计算Jacobian-Vector-Product的单值梯度的类
def __init__(self):
super(_JvpFirstGradSingleValue, self).__init__()
self.first_grad_single_value_op = C.GradOperation(sens_param=True)
def construct(self, u, fn, first_grad_single_value_input):
# 构造方法用于计算JVP的单值梯度
return self.first_grad_single_value_op(fn)(*first_grad_single_value_input, u)
class Jvp(Cell):
"""
Compute the jacobian-vector-product of the given fn. Jvp is equivalent to forward mode autodiff.
计算给定fn的雅可比向量积Jvp等同于前向模式自动微分
Args:
fn (Cell): The fn that takes Tensor inputs and returns a tuple of Tensors or a Tensor.
fn (Cell): 接受Tensor输入并返回Tensor元组或Tensor的fn
Inputs:
- **inputs** (Tensors) - The inputs to `fn`.
- **v** (Tensors or Tuple of Tensors) - The vector for which the Jacobian vector product is computed.
Must have the same size as the input of `fn`.
- **inputs** (Tensors) - `fn`的输入
- **v** (Tensors Tensor元组) - 用于计算雅可比向量积的向量
必须与`fn`的输入大小相同
Outputs:
A tuple with 2 Tensors or Tuple of Tensors:
包含2个Tensors或Tensor元组的元组
- **net_output** (Tensors or Tuple of Tensors) - The output of `fn(inputs)`.
- **jvp** (Tensors or Tuple of Tensors) - The result of the jacobian vector product.
- **net_output** (Tensors Tensor元组) - `fn(inputs)`的输出
- **jvp** (Tensors Tensor元组) - 雅可比向量积的结果
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
@ -113,6 +120,7 @@ class Jvp(Cell):
@ms_function
def construct(self, *args):
# 构造方法用于计算JVP
jvp_input = args[0:-1]
v = args[-1]
output = self.fn(*jvp_input)
@ -135,8 +143,8 @@ class Jvp(Cell):
class _JvpInner(Cell):
"""
Compute the jacobian-vector-product of the given network. Jvp is equivalent to forward mode autodiff.
This class implements the inner process of function jvp.
计算给定网络的雅可比向量积Jvp等同于前向模式自动微分
该类实现了JVP的内部过程
"""
def __init__(self):
super(_JvpInner, self).__init__()
@ -152,6 +160,7 @@ class _JvpInner(Cell):
self.tuple_len = Primitive("tuple_len")
def construct(self, *args):
# 构造方法用于计算内部JVP
fn = args[0]
v = args[1]
jvp_input = args[2:]
@ -175,22 +184,21 @@ class _JvpInner(Cell):
class Vjp(Cell):
"""
Computes the dot product between a vector `v` and the Jacobian of the given fn at the point
given by the inputs.
计算给定向量`v`与给定fn在输入点处的雅可比的点积
Args:
fn (Cell): The fn that takes Tensor inputs and returns a tuple of Tensors or a Tensor.
fn (Cell): 接受Tensor输入并返回Tensor元组或Tensor的fn
Inputs:
- **inputs** (Tensors) - The inputs to `fn`. Must be a tuple or a list.
- **v** (Tensors or Tuple of Tensors) - The vector for which the vector Jacobian product is computed.
Must have the same size as the output of `fn`.
- **inputs** (Tensors) - `fn`的输入必须是元组或列表
- **v** (Tensors Tensor元组) - 用于计算向量雅可比积的向量
必须与`fn`的输出大小相同
Outputs:
A tuple with 2 Tensors or Tuple of Tensors:
包含2个Tensors或Tensor元组的元组
- **net_output** (Tensors or Tuple of Tensors) - The output of `fn(inputs)`.
- **vjp** (Tensors or Tuple of Tensors) - The result of the dot product.
- **net_output** (Tensors Tensor元组) - `fn(inputs)`的输出
- **vjp** (Tensors Tensor元组) - 点积的结果
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
@ -226,6 +234,7 @@ class Vjp(Cell):
@ms_function
def construct(self, *args):
# 构造方法用于计算VJP
front_input = args[0:-1]
output = self.fn(*front_input)
if self.tuple_len(front_input) == 1:
@ -237,8 +246,8 @@ class Vjp(Cell):
class _VjpInner(Cell):
"""
Computes the dot product between a vector `v` and the Jacobian of the given network at the point
given by the inputs. This class implements the inner process of function vjp.
计算给定向量`v`与给定网络在输入点处的雅可比的点积
该类实现了VJP的内部过程
"""
def __init__(self):
@ -248,6 +257,7 @@ class _VjpInner(Cell):
self.tuple_len = Primitive("tuple_len")
def construct(self, *args):
# 构造方法用于计算内部VJP
fn = args[0]
front_input = args[1:-1]
input_with_v = args[1:]

@ -48,24 +48,23 @@ class LossBase(Cell):
"""
def __init__(self, reduction='mean'):
"""Initialize Loss."""
super(LossBase, self).__init__()
"""Initialize Loss.""" # 初始化LossBase类接收一个参数reduction默认值为'mean'
super(LossBase, self).__init__() # 调用父类Cell的初始化方法
if reduction not in ('mean', 'sum', 'none'):
raise ValueError(f"For '{self.cls_name}', the 'reduction' should be in ['mean', 'sum', 'none'], "
if reduction not in ('mean', 'sum', 'none'): # 检查reduction参数是否为'mean', 'sum', 'none'中的一个
raise ValueError(f"For '{self.cls_name}', the 'reduction' should be in ['mean', 'sum', 'none'], " # 如果参数不在允许的范围内抛出ValueError
f"but got {reduction}.")
self.average = True
self.reduce = True
if reduction == 'sum':
self.average = False
if reduction == 'none':
self.reduce = False
self.reduce_mean = P.ReduceMean()
self.reduce_sum = P.ReduceSum()
self.mul = P.Mul()
self.cast = P.Cast()
self.average = True # 设置average属性为True默认进行平均
self.reduce = True # 设置reduce属性为True默认进行降维
if reduction == 'sum': # 如果reduction参数为'sum'
self.average = False # 设置average属性为False不进行平均
if reduction == 'none': # 如果reduction参数为'none'
self.reduce = False # 设置reduce属性为False不进行降维
self.reduce_mean = P.ReduceMean() # 定义reduce_mean操作用于计算平均损失
self.reduce_sum = P.ReduceSum() # 定义reduce_sum操作用于计算总损失
self.mul = P.Mul() # 定义mul操作用于权重乘法
self.cast = P.Cast() # 定义cast操作用于数据类型转换
def get_axis(self, x):
"""
@ -98,10 +97,10 @@ class LossBase(Cell):
>>> print(output)
(0, 1)
"""
shape = F.shape(x)
length = F.tuple_len(shape)
perm = F.make_range(0, length)
return perm
shape = F.shape(x) # 获取输入张量x的形状
length = F.tuple_len(shape) # 获取形状的长度(即维度数量)
perm = F.make_range(0, length) # 生成一个从0到length-1的元组表示所有轴
return perm # 返回这个元组
def get_loss(self, x, weights=1.0):
"""
@ -141,20 +140,19 @@ class LossBase(Cell):
>>> print(output)
0.11111111
"""
input_dtype = x.dtype
x = self.cast(x, mstype.float32)
weights = self.cast(weights, mstype.float32)
x = self.mul(weights, x)
if self.reduce and self.average:
x = self.reduce_mean(x, self.get_axis(x))
if self.reduce and not self.average:
x = self.reduce_sum(x, self.get_axis(x))
x = self.cast(x, input_dtype)
return x
input_dtype = x.dtype # 获取输入张量x的数据类型
x = self.cast(x, mstype.float32) # 将输入张量x的数据类型转换为float32
weights = self.cast(weights, mstype.float32) # 将权重weights的数据类型转换为float32
x = self.mul(weights, x) # 将权重weights与输入张量x相乘
if self.reduce and self.average: # 如果需要降维且进行平均
x = self.reduce_mean(x, self.get_axis(x)) # 计算平均损失
if self.reduce and not self.average: # 如果需要降维但不进行平均
x = self.reduce_sum(x, self.get_axis(x)) # 计算总损失
x = self.cast(x, input_dtype) # 将损失x的数据类型转换回输入张量x的原始数据类型
return x # 返回计算得到的损失
def construct(self, logits, labels):
raise NotImplementedError
raise NotImplementedError # 这是一个抽象方法,需要在子类中实现
class _Loss(LossBase):
"""
@ -162,23 +160,21 @@ class _Loss(LossBase):
"""
def __init__(self, reduction='mean'):
"""Initialize _Loss."""
log.warning("'_Loss' is deprecated from version 1.3 and "
"""Initialize _Loss.""" # 初始化_Loss类接收一个参数reduction默认值为'mean'
log.warning("'_Loss' is deprecated from version 1.3 and " # 输出警告信息提示_Loss类已过时
"will be removed in a future version, use 'LossBase' instead.")
super(_Loss, self).__init__(reduction)
super(_Loss, self).__init__(reduction) # 调用父类LossBase的初始化方法
def construct(self, logits, labels):
raise NotImplementedError
raise NotImplementedError # 这是一个抽象方法,需要在子类中实现
@constexpr
def _check_is_tensor(param_name, input_data, cls_name):
"""Internal function, used to check whether the input data is Tensor."""
if input_data is not None and not isinstance(F.typeof(input_data), mstype.tensor_type):
raise TypeError(f"For '{cls_name}', the '{param_name}' should be '{mstype.tensor_type}', "
"""Internal function, used to check whether the input data is Tensor.""" # 定义一个内部函数用于检查输入数据是否为Tensor
if input_data is not None and not isinstance(F.typeof(input_data), mstype.tensor_type): # 如果输入数据不为None且类型不是Tensor
raise TypeError(f"For '{cls_name}', the '{param_name}' should be '{mstype.tensor_type}', " # 抛出TypeError
f"but got '{F.typeof(input_data)}'")
class L1Loss(LossBase):
r"""
L1Loss is used to calculate the mean absolute error between the predicted value and the target value.
@ -238,16 +234,15 @@ class L1Loss(LossBase):
"""
def __init__(self, reduction='mean'):
"""Initialize L1Loss."""
super(L1Loss, self).__init__(reduction)
self.abs = P.Abs()
"""Initialize L1Loss.""" # 初始化L1Loss类接收一个参数reduction默认值为'mean'
super(L1Loss, self).__init__(reduction) # 调用父类LossBase的初始化方法
self.abs = P.Abs() # 定义abs操作用于计算绝对值
def construct(self, logits, labels):
_check_is_tensor('logits', logits, self.cls_name)
_check_is_tensor('labels', labels, self.cls_name)
x = self.abs(logits - labels)
return self.get_loss(x)
_check_is_tensor('logits', logits, self.cls_name) # 检查logits是否为Tensor
_check_is_tensor('labels', labels, self.cls_name) # 检查labels是否为Tensor
x = self.abs(logits - labels) # 计算logits与labels的差的绝对值
return self.get_loss(x) # 使用self.get_loss方法计算加权损失并返回
class MSELoss(LossBase):
r"""
@ -308,10 +303,10 @@ class MSELoss(LossBase):
"""
def construct(self, logits, labels):
_check_is_tensor('logits', logits, self.cls_name)
_check_is_tensor('labels', labels, self.cls_name)
x = F.square(logits - labels)
return self.get_loss(x)
_check_is_tensor('logits', logits, self.cls_name) # 检查logits是否为Tensor
_check_is_tensor('labels', labels, self.cls_name) # 检查labels是否为Tensor
x = F.square(logits - labels) # 计算logits与labels的差的平方
return self.get_loss(x) # 使用self.get_loss方法计算加权损失并返回
class RMSELoss(LossBase):
@ -356,15 +351,14 @@ class RMSELoss(LossBase):
"""
def __init__(self):
"""Initialize RMSELoss."""
super(RMSELoss, self).__init__()
self.MSELoss = MSELoss()
"""Initialize RMSELoss.""" # 初始化RMSELoss类
super(RMSELoss, self).__init__() # 调用父类LossBase的初始化方法
self.MSELoss = MSELoss() # 初始化MSELoss对象用于计算均方误差
def construct(self, logits, label):
rmse_loss = F.sqrt(self.MSELoss(logits, label))
return rmse_loss
rmse_loss = F.sqrt(self.MSELoss(logits, label)) # 计算均方误差损失然后取平方根得到RMSE损失
return rmse_loss # 返回计算得到的RMSE损失
class MAELoss(LossBase):
r"""
@ -426,16 +420,15 @@ class MAELoss(LossBase):
"""
def __init__(self, reduction='mean'):
"""Initialize MAELoss."""
super(MAELoss, self).__init__(reduction)
self.abs = P.Abs()
"""Initialize MAELoss.""" # 初始化MAELoss类接收一个参数reduction默认值为'mean'
super(MAELoss, self).__init__(reduction) # 调用父类LossBase的初始化方法
self.abs = P.Abs() # 定义abs操作用于计算绝对值
def construct(self, logits, label):
_check_is_tensor('logits', logits, self.cls_name)
_check_is_tensor('labels', label, self.cls_name)
x = self.abs(logits - label)
return self.get_loss(x)
_check_is_tensor('logits', logits, self.cls_name) # 检查logits是否为Tensor
_check_is_tensor('labels', label, self.cls_name) # 检查labels是否为Tensor
x = self.abs(logits - label) # 计算logits与labels的差的绝对值
return self.get_loss(x) # 使用self.get_loss方法计算加权损失并返回
class SmoothL1Loss(LossBase):
r"""
@ -491,16 +484,15 @@ class SmoothL1Loss(LossBase):
"""
def __init__(self, beta=1.0):
"""Initialize SmoothL1Loss."""
super(SmoothL1Loss, self).__init__()
self.beta = beta
self.smooth_l1_loss = P.SmoothL1Loss(self.beta)
"""Initialize SmoothL1Loss.""" # 初始化SmoothL1Loss类接收一个参数beta默认值为1.0
super(SmoothL1Loss, self).__init__() # 调用父类LossBase的初始化方法
self.beta = beta # 设置beta属性表示平滑阈值
self.smooth_l1_loss = P.SmoothL1Loss(self.beta) # 定义smooth_l1_loss操作用于计算平滑L1损失
def construct(self, logits, labels):
_check_is_tensor('logits', logits, self.cls_name)
_check_is_tensor('labels', labels, self.cls_name)
return self.smooth_l1_loss(logits, labels)
_check_is_tensor('logits', logits, self.cls_name) # 检查logits是否为Tensor
_check_is_tensor('labels', labels, self.cls_name) # 检查labels是否为Tensor
return self.smooth_l1_loss(logits, labels) # 使用self.smooth_l1_loss计算平滑L1损失并返回
class SoftMarginLoss(LossBase):
r"""
@ -545,12 +537,11 @@ class SoftMarginLoss(LossBase):
"""
def __init__(self, reduction='mean'):
super(SoftMarginLoss, self).__init__()
self.soft_margin_loss = P.SoftMarginLoss(reduction)
super(SoftMarginLoss, self).__init__() # 调用父类LossBase的初始化方法
self.soft_margin_loss = P.SoftMarginLoss(reduction) # 定义soft_margin_loss操作用于计算SoftMargin损失
def construct(self, logits, labels):
return self.soft_margin_loss(logits, labels)
return self.soft_margin_loss(logits, labels) # 使用self.soft_margin_loss计算SoftMargin损失并返回
class SoftmaxCrossEntropyWithLogits(LossBase):
r"""
@ -619,27 +610,28 @@ class SoftmaxCrossEntropyWithLogits(LossBase):
def __init__(self,
sparse=False,
reduction='none'):
"""Initialize SoftmaxCrossEntropyWithLogits."""
super(SoftmaxCrossEntropyWithLogits, self).__init__(reduction)
self.sparse = validator.check_bool(sparse, "sparse", self.cls_name)
self.reduction = reduction
self.softmax_cross_entropy = P.SoftmaxCrossEntropyWithLogits()
self.one_hot = P.OneHot()
self.on_value = Tensor(1.0, mstype.float32)
self.off_value = Tensor(0., mstype.float32)
self.is_cpugpu = context.get_context('device_target') in ["CPU", "GPU"]
self.sparse_softmax_cross_entropy = P.SparseSoftmaxCrossEntropyWithLogits()
"""Initialize SoftmaxCrossEntropyWithLogits.""" # 初始化SoftmaxCrossEntropyWithLogits类接收sparse和reduction两个参数
super(SoftmaxCrossEntropyWithLogits, self).__init__(reduction) # 调用父类LossBase的初始化方法传入reduction参数
self.sparse = validator.check_bool(sparse, "sparse", self.cls_name) # 检查sparse参数是否为布尔值如果是则赋值给self.sparse
self.reduction = reduction # 设置reduction属性表示减少类型
self.softmax_cross_entropy = P.SoftmaxCrossEntropyWithLogits() # 定义softmax_cross_entropy操作用于计算Softmax交叉熵损失
self.one_hot = P.OneHot() # 定义one_hot操作用于将标签转换为OneHot编码
self.on_value = Tensor(1.0, mstype.float32) # 定义on_value属性表示OneHot编码中正类的值
self.off_value = Tensor(0., mstype.float32) # 定义off_value属性表示OneHot编码中负类的值
self.is_cpugpu = context.get_context('device_target') in ["CPU", "GPU"] # 定义is_cpugpu属性表示是否在CPU或GPU上运行
self.sparse_softmax_cross_entropy = P.SparseSoftmaxCrossEntropyWithLogits() # 定义sparse_softmax_cross_entropy操作用于计算稀疏Softmax交叉熵损失
def construct(self, logits, labels):
_check_is_tensor('logits', logits, self.cls_name)
_check_is_tensor('labels', labels, self.cls_name)
if self.sparse:
if self.reduction == 'mean':
x = self.sparse_softmax_cross_entropy(logits, labels)
return x
labels = self.one_hot(labels, F.shape(logits)[-1], self.on_value, self.off_value)
x = self.softmax_cross_entropy(logits, labels)[0]
return self.get_loss(x)
_check_is_tensor('logits', logits, self.cls_name) # 检查logits是否为Tensor
_check_is_tensor('labels', labels, self.cls_name) # 检查labels是否为Tensor
if self.sparse: # 如果使用稀疏标签格式
if self.reduction == 'mean': # 如果reduction为'mean'
x = self.sparse_softmax_cross_entropy(logits, labels) # 使用稀疏Softmax交叉熵损失计算损失
return x # 返回计算得到的损失
labels = self.one_hot(labels, F.shape(logits)[-1], self.on_value, self.off_value) # 将labels转换为OneHot编码
x = self.softmax_cross_entropy(logits, labels)[0] # 计算Softmax交叉熵损失取第一个返回值
return self.get_loss(x) # 使用self.get_loss方法计算加权损失并返回
@constexpr

@ -85,58 +85,84 @@ def array(obj, dtype=None, copy=True, ndmin=0):
>>> print(np.array([1,2,3]))
[1 2 3]
"""
if dtype is not None:
if dtype is not None: # 如果用户指定了数据类型则检查并转换为mindspore的数据类型
dtype = _check_dtype(dtype)
res = asarray(obj, dtype)
res = asarray(obj, dtype) # 将输入对象转换为tensor
if ndmin > res.ndim:
if res.size == 0:
if ndmin > res.ndim: # 如果用户指定的最小维度大于转换后的tensor维度则在tensor的前面添加维度
if res.size == 0: # 如果tensor为空抛出异常
_raise_value_error("Empty tensor cannot be expanded beyond the current dimension.")
res = _expand(res, ndmin)
res = _expand(res, ndmin) # 扩展tensor的维度
if copy and isinstance(obj, Tensor):
if copy and isinstance(obj, Tensor): # 如果copy为True且输入对象已经是tensor则创建其副本
res = copy_(res)
elif dtype is not None and dtype != res.dtype:
elif dtype is not None and dtype != res.dtype: # 如果用户指定了数据类型且与转换后的tensor数据类型不同则转换数据类型
res = res.astype(dtype)
return res
return res # 返回最终生成的tensor
@constexpr
def asarray_const(a, dtype=None):
# 标记此函数为constexpr意味着它是一个编译时常量函数
"""Converts the input to tensor. Note here `a` cannot be tensor itself."""
# 文档字符串解释函数作用将输入转换为张量注意这里的a不能是张量本身
_check_input_for_asarray(a)
# 检查输入a是否符合asarray函数的输入要求
if dtype is not None:
# 如果dtype不为None
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
if isinstance(a, (float, int, bool)) and dtype is None:
# 如果a是float、int或bool类型并且dtype未指定
dtype = _get_dtype_from_scalar(a)
# 从标量a中获取数据类型并赋值给dtype
if isinstance(a, (list, tuple)):
# 如果a是list或tuple类型
# Convert all tuple/nested tuples to lists
a = _deep_list(a)
# 将所有tuple及其嵌套的tuple转换为list
# Convert all tensor sub-elements to numpy arrays
a = _deep_tensor_to_nparray(a)
# 将所有tensor子元素转换为numpy数组
a = onp.asarray(a)
# 使用numpy的asarray函数将a转换为numpy数组
if a.dtype is onp.dtype('object'):
# 如果转换后的numpy数组的数据类型是object
raise ValueError('Input array must have the same size across all dimensions.')
# 抛出ValueError表示输入数组在所有维度上必须具有相同的大小
# If dtype is not specified, we keep consistent with numpy decision
# only exceptions are: we use int/float32
if dtype is None:
# 如果dtype未指定
dtype = mstype.pytype_to_dtype(a.dtype)
# 将numpy数组的数据类型转换为mindspore的dtype
if dtype == mstype.float64:
# 如果dtype是float64
dtype = mstype.float32
# 将dtype改为float32
elif dtype == mstype.int64:
# 如果dtype是int64
dtype = mstype.int32
# 将dtype改为int32
if isinstance(a, onp.ndarray) and dtype is None:
# 如果a是numpy数组并且dtype未指定
if a.dtype is onp.dtype('object'):
# 如果numpy数组的数据类型是object
raise TypeError(f"For Tensor conversion, the input_data is {a} that contains unsupported element.")
# 抛出TypeError表示输入数据包含不支持的元素
dtype = mstype.pytype_to_dtype(a.dtype)
# 将numpy数组的数据类型转换为mindspore的dtype
a = Tensor.from_numpy(a)
# 将numpy数组转换为mindspore的Tensor
return Tensor(a, dtype=dtype)
# 返回一个具有指定dtype的Tensor
def asarray(a, dtype=None):
@ -168,29 +194,46 @@ def asarray(a, dtype=None):
[1 2 3]
"""
if dtype is not None:
# 如果dtype不为None
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
if isinstance(a, Tensor):
# 如果a是Tensor类型
if dtype is None or dtype == a.dtype:
# 如果dtype未指定或指定的数据类型与a的数据类型相同
return a
# 直接返回a
return a.astype(dtype)
# 如果指定的数据类型与a的数据类型不同将a的数据类型转换为指定的dtype并返回
return asarray_const(a, dtype)
# 如果a不是Tensor类型调用asarray_const函数将其转换为Tensor并返回
@constexpr
def asfarray_const(a, dtype=mstype.float32):
"""Converts the input to tensor. Note here `a` cannot be tensor itself."""
# 文档字符串解释函数作用将输入转换为张量注意这里的a不能是张量本身
_check_input_for_asarray(a)
# 检查输入a是否符合asarray函数的输入要求
if isinstance(a, (list, tuple)):
# 如果a是list或tuple类型
# Convert all tuple/nested tuples to lists
a = _deep_list(a)
# 将所有tuple及其嵌套的tuple转换为list
# Convert all tensor sub-elements to numpy arrays
a = _deep_tensor_to_nparray(a)
# 将所有tensor子元素转换为numpy数组
a = onp.asarray(a)
# 使用numpy的asarray函数将a转换为numpy数组
if a.dtype is onp.dtype('object'):
# 如果转换后的numpy数组的数据类型是object
raise ValueError(f"For Tensor conversion, the input_data is {a} that contains unsupported element.")
# 抛出ValueError表示输入数组在所有维度上必须具有相同的大小
a = Tensor.from_numpy(a)
# 将numpy数组转换为mindspore的Tensor
return Tensor(a, dtype)
# 返回一个具有指定dtype的Tensor
def asfarray(a, dtype=mstype.float32):
@ -206,7 +249,6 @@ def asfarray(a, dtype=mstype.float32):
be in format of np.int32, or \'int32\'. If dtype is :class:`None`, the data type
of the new tensor will be inferred from `a`. Default is :class:`mindspore.float32`.
Returns:
Tensor, generated tensor with the specified float dtype.
@ -223,16 +265,24 @@ def asfarray(a, dtype=mstype.float32):
[1. 2. 3.]
"""
if dtype is None:
# 如果dtype未指定
return asarray(a)
# 调用asarray函数将a转换为Tensor并返回
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
if dtype not in (mstype.float16, mstype.float32, mstype.float64):
# 如果dtype不是float16、float32或float64
dtype = mstype.float32
# 将dtype改为float32
if isinstance(a, Tensor):
# 如果a是Tensor类型
return a.astype(dtype)
# 将a的数据类型转换为指定的dtype并返回
return asfarray_const(a, dtype)
# 如果a不是Tensor类型调用asfarray_const函数将其转换为Tensor并返回
def copy_(a):
@ -261,7 +311,9 @@ def copy_(a):
[1. 1.]]
"""
a = asarray(a)
# 使用asarray函数将a转换为Tensor
return a.copy()
# 返回a的副本
def ones(shape, dtype=mstype.float32):
@ -290,11 +342,17 @@ def ones(shape, dtype=mstype.float32):
[1. 1.]]
"""
shape = _check_shape(shape)
# 检查并确认shape是一个有效的形状
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
if _is_shape_empty(shape):
# 如果shape表示的形状是空的
return full(shape, 1.0, dtype)
# 使用full函数创建一个指定形状、数据类型并用1.0填充的Tensor
output = F.fill(dtype, shape, 1)
# 使用F.fill函数创建一个指定形状、数据类型并用1填充的Tensor
return output
# 返回创建的Tensor
def zeros(shape, dtype=mstype.float32):
@ -323,11 +381,17 @@ def zeros(shape, dtype=mstype.float32):
[0. 0.]]
"""
shape = _check_shape(shape)
# 检查并确认shape是一个有效的形状
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
if _is_shape_empty(shape):
# 如果shape表示的形状是空的
return full(shape, 0.0, dtype)
# 使用full函数创建一个指定形状、数据类型并用0.0填充的Tensor
output = F.fill(dtype, shape, 0)
# 使用F.fill函数创建一个指定形状、数据类型并用0填充的Tensor
return output
# 返回创建的Tensor
def full(shape, fill_value, dtype=None):
@ -360,24 +424,42 @@ def full(shape, fill_value, dtype=None):
[True True]]
"""
shape = _check_shape(shape)
# 检查并确认shape是一个有效的形状
if not isinstance(fill_value, ARRAY_TYPES):
# 如果fill_value不是int、float、bool、list、tuple、Tensor类型
_raise_type_error("fill value should be int, float, bool, list, tuple, Tensor, but got", fill_value)
# 抛出TypeError表示fill_value类型不支持
if dtype is not None:
# 如果dtype不为None
dtype = _check_dtype(dtype)
# 检查并确认dtype是一个有效的数据类型
else:
# 如果dtype为None
if isinstance(fill_value, (int, float, bool)):
# 如果fill_value是int、float或bool类型
dtype = _get_dtype_from_scalar(fill_value)
# 从标量fill_value中获取数据类型并赋值给dtype
if isinstance(fill_value, Tensor):
# 如果fill_value是Tensor类型
dtype = fill_value.dtype
# 从Tensor fill_value中获取数据类型并赋值给dtype
if not _is_shape_empty(shape):
# 如果shape表示的形状不是空的
if isinstance(fill_value, (int, float, bool)):
# 如果fill_value是int、float或bool类型
return F.fill(dtype, shape, fill_value)
# 使用F.fill函数创建一个指定形状、数据类型并用fill_value填充的Tensor
if isinstance(fill_value, (list, tuple)):
# 如果fill_value是list或tuple类型
fill_value = asarray_const(fill_value)
# 使用asarray_const函数将fill_value转换为Tensor
return broadcast_to(fill_value, shape)
# 使用broadcast_to函数将fill_value广播到指定的shape并返回结果
# if shape contains zero, use c.Tensor()
return _convert_64_to_32(empty_compile(dtype, shape))
# 如果shape包含零使用empty_compile函数创建一个空的Tensor并使用_convert_64_to_32函数将数据类型从float64转换为float32
@constexpr

@ -17,65 +17,71 @@ from ..._checkparam import Validator as validator
from ...common import dtype as mstype
from ..primitive import prim_attr_register, PrimitiveWithCheck
from .. import signature as sig
class UpdateCache(PrimitiveWithCheck):
"""
Update the value fo input_x, similar to ScatterNdUpdate.
The difference is that UpdateCache will not update when indices < 0 or indices >= max_num.
更新 input_x 的值类似于 ScatterNdUpdate
不同之处在于UpdateCache indices < 0 indices >= max_num 时不会更新
Inputs:
- **input_x** (Parameter) - Parameter which is going to be updated.
- **indices** (Tensor) - Update indices of input_x.
- **updates** (Tensor) - The update values.
- **input_x** (Parameter) - 将要更新的参数
- **indices** (Tensor) - input_x 的更新索引
- **updates** (Tensor) - 更新值
Outputs:
- **out** (Tensor) - Returns a [1] Tensor, which is not useful.
- **out** (Tensor) - 返回一个 [1] 的张量这个张量没有用处
"""
# 定义函数签名,指定输入参数的类型和读写权限
__mindspore_signature__ = (
# 定义输入参数input_x类型为T读写权限为写
sig.make_sig('input_x', sig.sig_rw.RW_WRITE,
dtype=sig.sig_dtype.T),
# 定义输入参数indices类型为T1
sig.make_sig('indices', dtype=sig.sig_dtype.T1),
# 定义输入参数updates类型为T
sig.make_sig('updates', dtype=sig.sig_dtype.T),
# 定义输入参数max_num类型为T1
sig.make_sig('max_num', dtype=sig.sig_dtype.T1)
)
@prim_attr_register
def __init__(self):
"""init UpdateCache"""
"""初始化 UpdateCache"""
# 初始化输入和输出名称
self.init_prim_io_names(inputs=['input_x', 'indices', 'update', 'max_num'],
outputs=['out'])
def check_shape(self, input_x_shape, indices_shape, update_shape, max_num_shape):
# 检查输入形状
return [1]
def check_dtype(self, input_x_dtype, indices_dtype, update_dtype, max_num_dtype):
# 检查输入数据类型
validator.check_tensor_dtype_valid(
"indices", indices_dtype, mstype.int_type, self.name)
return input_x_dtype
class SubAndFilter(PrimitiveWithCheck):
"""
Dynamic kernel, sub an offset and
return the elements which in range [0, max_num).
动态内核减去一个偏移量并返回在范围 [0, max_num) 内的元素
Inputs:
- **input_x** (Tensor) - Input tensor.
- **max_num** (Int) - The max value of element that after sub `offset`.
- **offset** (int) - Specifies the offset value of this `input_x`.
- **input_x** (Tensor) - 输入张量
- **max_num** (Int) - 减去 `offset` 后元素的最大值
- **offset** (int) - 指定此 `input_x` 的偏移值
Outputs:
tuple(Tensor), tuple of 2 tensors, filter_res and filter_idx.
- **filter_res** (Tensor) - The result that `input_x` minus `offset`,
and return which in the range [0, max_num).
- **filter_idx** (Tensor) - A tensor containing indices of elements in the input
coressponding to the output tensor.
tuple(Tensor), 2 个张量组成的元组filter_res filter_idx
- **filter_res** (Tensor) - `input_x` 减去 `offset` 的结果
并返回在范围 [0, max_num) 内的值
- **filter_idx** (Tensor) - 一个张量包含与输出张量对应的输入元素的索引
Supported Platforms:
`CPU`
Examples:
>>> x = Tensor(np.array([1, 3, 5, 8, 9, 16]), mindspore.int32)
>>> max_num = 10
@ -87,35 +93,38 @@ class SubAndFilter(PrimitiveWithCheck):
"""
@prim_attr_register
def __init__(self):
"""init SubAndFilter"""
"""初始化 SubAndFilter"""
# 初始化输入和输出名称
self.init_prim_io_names(inputs=['input_x', 'max_num', 'offset'],
outputs=['sub_res', 'sub_idx'])
def check_shape(self, input_x_shape, max_num_shape, offset_shape):
# 检查输入形状
return ((-1,), (-1,))
def check_dtype(self, input_x_dtype, max_num_dtype, offset_dtype):
# 检查输入数据类型
validator.check_tensor_dtype_valid(
"input_x", input_x_dtype, mstype.int_type, self.name)
return input_x_dtype
class MapUniform(PrimitiveWithCheck):
"""
Map a tensor by using fomula : value = key % `group_num` * `per_group_size` + key // `group_num`.
通过公式映射一个张量value = key % `group_num` * `per_group_size` + key // `group_num`
Inputs:
- **input** (Tensor) - Input Tensor.
- **per_group_size** (int) - The size of each group.
- **group_num** (int) - The number of group.
- **input** (Tensor) - 输入张量
- **per_group_size** (int) - 每个组的大小
- **group_num** (int) - 组的数量
Outputs:
Tensor, has the same dtype and shape as the `input`.
Tensor具有与 `input` 相同的 dtype 和形状
Supported Platforms:
`CPU`
Examples:
>>> input_x = Tensor(np.array([0, 1, 2, 3, 4, 5, 6, 7]))
>>> per_group_size = 4
@ -125,33 +134,34 @@ class MapUniform(PrimitiveWithCheck):
>>> print(output)
[0, 4, 1, 5, 2, 6, 3, 7]
"""
@prim_attr_register
def __init__(self):
"""init MapUniform"""
"""初始化 MapUniform"""
self.init_prim_io_names(inputs=['input', 'per_group_size', 'group_num'],
outputs=['output'])
def check_dtype(self, input_dtype, per_group_size_dtype, group_num_dtype):
"""检查输入数据类型"""
validator.check_tensor_dtype_valid(
"input", input_dtype, mstype.int_type, self.name)
validator.check_value_type(
'per_group_size', per_group_size_dtype, [mstype.Int], self.name)
validator.check_value_type(
'group_num', group_num_dtype, [mstype.Int], self.name)
class CacheSwapTable(PrimitiveWithCheck):
"""
Delete a hashmap entry,and insert a new key to hashmap, return the key and value of delete entry.
删除一个哈希映射条目并插入一个新键到哈希映射中返回删除条目的键和值
Inputs:
- **cache_table** (Parameter) - The cache table which is on device.
- **swap_cache_idx** (Tensor) - The index of table which need to swap. -1 is skipped.
- **miss_value** (int) - The values which arg going to swap into cache table.
- **cache_table** (Parameter) - 在设备上的缓存表
- **swap_cache_idx** (Tensor) - 需要交换的表索引-1 被跳过
- **miss_value** (int) - 将要交换到缓存表的值
Outputs:
- **old_value** (Tensor) - The values which are swapped out.
- **old_value** (Tensor) - 被交换出去的值
"""
__mindspore_signature__ = (
sig.make_sig('cache_table', sig.sig_rw.RW_WRITE,
@ -159,31 +169,35 @@ class CacheSwapTable(PrimitiveWithCheck):
sig.make_sig('swap_cache_idx', dtype=sig.sig_dtype.T1),
sig.make_sig('miss_value', dtype=sig.sig_dtype.T)
)
@prim_attr_register
def __init__(self):
"""init CacheSwapTable"""
"""初始化 CacheSwapTable"""
self.init_prim_io_names(inputs=['cache_table', 'swap_cache_idx', 'miss_value'],
outputs=['old_value'])
def check_shape(self, cache_table_shape, swap_cache_idx_shape, miss_value_shape):
# 检查cache_table_shape的长度是否为2如果不是则抛出ValueError异常
if len(cache_table_shape) != 2:
raise ValueError(
"cache table shape must be 2, but got %d" % len(cache_table_shape))
# 返回miss_value_shape
return miss_value_shape
def check_dtype(self, cache_table_dtype, swap_cache_idx_dtype, miss_value_dtype):
# 检查swap_cache_idx_dtype是否为mstype.int_type如果不是则抛出ValueError异常
validator.check_tensor_dtype_valid(
"swap_cache_idx", swap_cache_idx_dtype, mstype.int_type, self.name)
# 返回miss_value_dtype
return miss_value_dtype
class MapCacheIdx(PrimitiveWithCheck):
"""
MapCacheIdx merge SearchCacheIdx, CacheSwapHashmap, UpdateCache together.
When input an indices tensor, it will output the cache indices which search in hashmap.
MapCacheIdx SearchCacheIdxCacheSwapHashmap UpdateCache 合并在一起
当输入一个索引张量时它将输出在哈希映射中搜索的缓存索引
"""
__mindspore_signature__ = (
sig.make_sig('hashmap', sig.sig_rw.RW_WRITE,
@ -193,56 +207,69 @@ class MapCacheIdx(PrimitiveWithCheck):
sig.make_sig('emb_max_num', dtype=sig.sig_dtype.T),
sig.make_sig('cache_max_num', dtype=sig.sig_dtype.T)
)
@prim_attr_register
def __init__(self):
"""init MapCacheIdx"""
"""初始化 MapCacheIdx"""
self.init_prim_io_names(inputs=['hashmap', 'indices', 'step', 'emb_max_num', 'offset'],
outputs=['cache_idx', 'old_emb_idx', 'miss_emb_idx', 'swap_cache_idx'])
def __check__(self, hashmap, indices, step, emb_max_num, offset):
# 获取hashmap的形状
hashmap_shape = hashmap['shape']
# 如果hashmap的维度不是2则抛出异常
if len(hashmap_shape) != 2:
raise ValueError("The dimension of 'hashmap' in SearchCacheIdx must be 2, "
"but got %d." % len(hashmap_shape))
# 设置输出的形状
out_shape = (indices['shape'], -1, -1, -1)
# 获取hashmap和indices的数据类型
hashmap_dtype = hashmap['dtype']
indices_dtype = indices['dtype']
# 将数据类型存入字典
args = {"hashmap": hashmap_dtype, "indices": indices_dtype}
# 检查数据类型是否相同且有效
validator.check_tensors_dtypes_same_and_valid(
args, mstype.int_type, self.name)
# 设置输出的数据类型
out_dtype = (hashmap_dtype, hashmap_dtype,
hashmap_dtype, hashmap_dtype)
# 设置输出的字典
out = {'shape': out_shape,
'dtype': out_dtype,
'value': None}
# 如果indices中有max_shape则设置输出的max_shape
if 'max_shape' in indices:
out['max_shape'] = (indices['max_shape'], indices['max_shape'],
indices['max_shape'], indices['max_shape'])
# 否则设置输出的max_shape为indices的形状
else:
out['max_shape'] = (indices['shape'], indices['shape'],
indices['shape'], indices['shape'])
# 如果indices中有min_shape则设置输出的min_shape
if 'min_shape' in indices:
out['min_shape'] = (indices['min_shape'], 0, 0, 0)
# 否则设置输出的min_shape为(0, 0, 0, 0)
else:
out['min_shape'] = (0, 0, 0, 0)
# 返回输出的字典
return out
class DynamicAssign(PrimitiveWithCheck):
"""
Assigns `Parameter` with a value, the `value` can have a dynamic shape.
`Parameter` 与值分配`value` 可以具有动态形状
Inputs:
- **variable** (Parameter) - The `Parameter`.
- **value** (Tensor) - The value to be assigned.
- **variable** (Parameter) - `Parameter`
- **value** (Tensor) - 要分配的值
Outputs:
Tensor, has the same type as original `variable`.
Tensor具有与原始 `variable` 相同的类型
Supported Platforms:
`CPU`
"""
@ -250,41 +277,42 @@ class DynamicAssign(PrimitiveWithCheck):
sig.make_sig('variable', sig.sig_rw.RW_WRITE, dtype=sig.sig_dtype.T),
sig.make_sig('value', dtype=sig.sig_dtype.T)
)
@prim_attr_register
def __init__(self):
self.init_prim_io_names(inputs=['ref', 'value'], outputs=['output'])
def check_dtype(self, variable, value):
# 检查变量是否为mstype.type_refkey
if variable != mstype.type_refkey:
# 检查变量是否为mstype.number_type类型
validator.check_tensor_dtype_valid(
"variable", variable, mstype.number_type, self.name)
# 检查value是否为mstype.number_type类型
validator.check_scalar_or_tensor_types_same(
{"value": value}, mstype.number_type, self.name)
class PadAndShift(PrimitiveWithCheck):
"""
Pad a tensor with -1, and shift with a length.
-1 填充张量并按长度进行移位
Inputs:
- **input_x** (Tensor) - The input Tensor, which will be copied
to `output`.
- **cum_sum_arr** (Tensor) - The last value of cum_sum_arr is
the pad length of output tensor, cum_sum_arr[shift_idx] is
the start to shift, and cum_sum_arr[shift_idx+1] is the end.
- **shift_idx** (Int) - The idx of cum_sum_arr.
if use python, PadAndShift is:
- **input_x** (Tensor) - 输入张量将被复制到 `output`
- **cum_sum_arr** (Tensor) - cum_sum_arr 的最后一个值是输出张量的填充长度
cum_sum_arr[shift_idx] 是开始移位cum_sum_arr[shift_idx+1] 是结束
- **shift_idx** (Int) - cum_sum_arr 的索引
如果使用 PythonPadAndShift
output = [-1] * cum_sum_arr[-1]
start = cum_sum_arr[shift_idx]
end = cum_sum_arr[shift_idx + 1]
output[start:end] = input_x[:(end-start)]
Outputs:
Tensor, has the same type as original `variable`.
Tensor具有与原始 `variable` 相同的类型
Supported Platforms:
`CPU`
Examples:
>>> input_x = Tensor(np.array([9, 13, -1, -1, -1, -1, -1, -1]), mstype.int32)
>>> cum_sum_arr = Tensor(np.array([0, 3, 5]), mstype.int32)
@ -296,11 +324,14 @@ class PadAndShift(PrimitiveWithCheck):
"""
@prim_attr_register
def __init__(self):
# 初始化输入输出名称
self.init_prim_io_names(
inputs=['input_x', 'cum_sum_arr', 'shift_idx'], outputs=['output'])
def check_shape(self, input_x_shape, cum_sum_arr_shape, shift_idx_shape):
# 检查输入形状
return input_x_shape
def check_dtype(self, input_x_dtype, cum_sum_arr_dtype, shift_idx_dtype):
return input_x_dtype
# 检查输入数据类型
return input_x_dtype

@ -12,39 +12,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Operators for TensorArray."""
import mindspore as ms
from ..._checkparam import Validator as validator
from ..._checkparam import Rel
from ...common import dtype as mstype
from ..primitive import prim_attr_register, PrimitiveWithInfer, Primitive
class TensorArray(PrimitiveWithInfer):
r"""
TensorArrayCreate used to create a TensorArray and return an unique handle.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Args:
dtype (mindspore.dtype): the data type in the TensorArray.
element_shape (tuple[int]): the shape of each tensor in a TensorArray.
dynamic_size (bool): If true the TensorArray can increase the size. Default: True.
size (int): The size of the TensorArray if dynamic_size = False.
name (string): the name of this TensorArray. Default: "TA".
Inputs:
None.
Outputs:
- **output** (Tensor[mindspore.int64]) - an unique handle binded to the TensorArray.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -55,6 +55,7 @@ class TensorArray(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self, dtype, element_shape, dynamic_size=True, size=0, name="TA"):
"""初始化TensorArray类设置参数和属性."""
validator.check_type_name("dtype", dtype, mstype.number_type + (mstype.bool_,), self.name)
validator.check_int(size, 0, Rel.GE, "size", self.name)
self.add_prim_attr('dtype', dtype)
@ -63,32 +64,34 @@ class TensorArray(PrimitiveWithInfer):
self.add_prim_attr('size', size)
self.add_prim_attr('side_effect_mem', True)
self.add_prim_attr('name', name)
def infer_shape(self):
"""推断输出形状."""
return ()
def infer_dtype(self):
"""推断输出数据类型."""
return mstype.int64
class TensorArrayWrite(PrimitiveWithInfer):
r"""
TensorArrayWrite used to write tensor into a created TensorArray.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Inputs:
- **index** (Tensor[int64]) - The position to write.
- **value** (Tensor) - The value to add into the TensorArray.
- **handle** (Tensor[int64]) - The handle pointed to the TensorArray.
Outputs:
None.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -99,39 +102,42 @@ class TensorArrayWrite(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self):
"""初始化TensorArrayWrite类."""
self.add_prim_attr('side_effect_mem', True)
def infer_shape(self, handle_shape, index_shape, value_shape):
"""推断输出形状."""
return ()
def infer_dtype(self, handle_type, index_type, value_type):
"""推断输出数据类型."""
validator.check_type_name("handle", handle_type, (ms.int64), self.name)
validator.check_type_name("index", index_type, (int, ms.int64), self.name)
validator.check_type_name("value", value_type, mstype.number_type + (mstype.bool_,), self.name)
return mstype.int64
class TensorArrayRead(PrimitiveWithInfer):
r"""
TensorArrayRead used to read tensor from a created TensorArray by the given index.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Args:
dtype (mindspore.dtype): the data type in the TensorArray.
element_shape (tuple[int]): the shape of each tensor in a TensorArray.
Inputs:
- **index** (Tensor[int64]) - The position to read.
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
Outputs:
- **output** (Tensor) - the value in position index.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -146,38 +152,41 @@ class TensorArrayRead(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self, dtype, element_shape):
"""初始化TensorArrayRead类设置参数和属性."""
validator.check_type_name("dtype", dtype, mstype.number_type + (mstype.bool_,), self.name)
self.add_prim_attr('dtype', dtype)
self.add_prim_attr('element_shape', element_shape)
self.add_prim_attr('side_effect_mem', True)
self.dtype = dtype
self.shape = element_shape
def infer_shape(self, handle_shape, index_shape):
"""推断输出形状."""
return self.shape
def infer_dtype(self, handle_type, index_type):
"""推断输出数据类型."""
validator.check_type_name("handle", handle_type, (ms.int64), self.name)
validator.check_type_name("index", index_type, (int, ms.int64), self.name)
return self.dtype
class TensorArrayClose(PrimitiveWithInfer):
r"""
TensorArrayClose used to close the created TensorArray. The resources in TensorArray will be deleted.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Inputs:
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
Outputs:
None.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -188,32 +197,35 @@ class TensorArrayClose(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self):
"""初始化TensorArrayClose类."""
self.add_prim_attr('side_effect_mem', True)
def infer_shape(self, handle_shape):
"""推断输出形状."""
return ()
def infer_dtype(self, handle_type):
"""推断输出数据类型."""
validator.check_type_name("handle", handle_type, (ms.int64), self.name)
return mstype.int64
class TensorArrayClear(PrimitiveWithInfer):
r"""
TensorArrayClear used to reset the created TensorArray. The instance of TensorArray is still aviliable.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Inputs:
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
Outputs:
None.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -224,36 +236,39 @@ class TensorArrayClear(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self):
"""初始化TensorArrayClear类."""
self.add_prim_attr('side_effect_mem', True)
def infer_shape(self, handle_shape):
"""推断输出形状."""
return ()
def infer_dtype(self, handle_type):
"""推断输出数据类型."""
validator.check_type_name("handle", handle_type, (ms.int64), self.name)
return mstype.int64
class TensorArrayStack(Primitive):
r"""
TensorArrayStack used to stack the tensors in a created TensorArray into one tensor.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Args:
dtype (mindspore.dtype): the data type in the TensorArray.
element_shape (tuple[int]): the shape of each tensor in a TensorArray.
Inputs:
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
Outputs:
- **output** (Tensor) - the stacked value from the TensorArray.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -269,31 +284,31 @@ class TensorArrayStack(Primitive):
"""
@prim_attr_register
def __init__(self, dtype, element_shape, dynamic_size, size):
"""Initialize TensorArrayStack"""
"""初始化TensorArrayStack类设置参数和属性."""
self.init_prim_io_names(inputs=[''], outputs=['output'])
self.add_prim_attr('dtype', dtype)
self.add_prim_attr('element_shape', element_shape)
self.add_prim_attr('is_dynamic_shape', dynamic_size)
self.add_prim_attr('size', size)
self.add_prim_attr('side_effect_mem', True)
class TensorArraySize(PrimitiveWithInfer):
r"""
TensorArraySize used to get the logical size of the created TensorArray.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Inputs:
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
Outputs:
- **output** (Tensor[mindspore.int64]) - the logical size of the TensorArray.
Supported Platforms:
``GPU`` ``CPU``
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -304,34 +319,37 @@ class TensorArraySize(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self):
"""初始化TensorArraySize类."""
self.add_prim_attr('side_effect_mem', True)
def infer_shape(self, handle_shape):
"""推断输出形状."""
return ()
def infer_dtype(self, handle_type):
"""推断输出数据类型."""
validator.check_type_name("handle", handle_type, (ms.int64), self.name)
return mstype.int64
class TensorArrayGather(PrimitiveWithInfer):
r"""
TensorArrayGather used to gather specified elements from the created TensorArray.
.. warning::
This is an experimental prototype that is subject to change and/or deletion.
Args:
dtype (mindspore.dtype): the data type in the TensorArray.
element_shape (tuple[int]): the shape of each tensor in a TensorArray.
Inputs:
- **handle** (mindspore.int64) - The handle pointed to the TensorArray.
- **indices** (mindspore.int32) - The locations of the gathered elements.
Outputs:
- **output** (Tensor) - The gathered value from the TensorArray.
Examples:
>>> import mindspore
>>> import mindspore.ops as ops
@ -344,17 +362,20 @@ class TensorArrayGather(PrimitiveWithInfer):
"""
@prim_attr_register
def __init__(self, dtype, element_shape):
"""初始化TensorArrayGather类设置参数和属性."""
self.init_prim_io_names(inputs=['handle', 'indices'], outputs=['value'])
self.add_prim_attr("side_effect_mem", True)
self.dtype = dtype
self.element_shape = element_shape
def infer_shape(self, handle, indices):
"""推断输出形状."""
if len(indices) != 1:
return ValueError("indices dimension should be equal to 1")
return [indices[0]] + list(self.element_shape)
def infer_dtype(self, handle, indices):
"""推断输出数据类型."""
validator.check_type_name("handle", handle, (ms.int64), self.name)
validator.check_type_name("indices", indices, (ms.int32), self.name)
return self.dtype
return self.dtype

@ -30,10 +30,12 @@ class AllGatherCell(Cell):
def __init__(self, group):
super(AllGatherCell, self).__init__(auto_prefix=False)
# 创建AllGather操作对象
self.allgather = AllGather(group)
@ms_function()
def construct(self, x):
# 执行AllGather操作
x = self.allgather(x)
return x
@ -50,10 +52,12 @@ class SaveOptShardCkptCell(Cell):
"""
def __init__(self, group):
super(SaveOptShardCkptCell, self).__init__(auto_prefix=False)
# 创建AllGather操作对象
self.allgather1 = AllGather(group)
self.allgather2 = AllGather()
def construct(self, x):
# 执行AllGather操作
x = self.allgather1(x)
x = self.allgather2(x)
@ -64,11 +68,14 @@ def get_allgather_cell(group, need_merge_twice=False):
"""Get AllGatherCell object."""
global _allgather_cell
if need_merge_twice:
# 如果需要两次合并则创建SaveOptShardCkptCell对象
_allgather_cell = SaveOptShardCkptCell(group)
else:
if group:
# 如果有指定的设备组则创建AllGatherCell对象
_allgather_cell = AllGatherCell(group)
else:
# 否则创建AllGatherCell对象使用全局通信组
_allgather_cell = AllGatherCell(GlobalComm.WORLD_COMM_GROUP)
return _allgather_cell
@ -77,4 +84,5 @@ def destroy_allgather_cell():
"""Destroy AllGatherCell object."""
global _allgather_cell
if _allgather_cell:
# 销毁AllGatherCell对象
_allgather_cell = None

@ -22,185 +22,171 @@ from ..common import dtype as mstype
from .utils_const import _type_convert, _raise_value_error, _callable_const, _super_check, pack
from ..ops.composite import GradOperation
grad = GradOperation(get_all=False, get_by_list=False, sens_param=False)
_eps_net = ops.Eps()
grad = GradOperation(get_all=False, get_by_list=False, sens_param=False) # 定义一个求梯度操作,设置为只求第一个参数的梯度,不通过列表获取,且不使用敏感参数
_eps_net = ops.Eps() # 定义一个计算数值精度的操作
def _convert_64_to_32(tensor):
def _convert_64_to_32(tensor): # 定义一个函数将输入的tensor从float64或int64类型转换为float32或int32类型
"""Convert Tensor with float64/int64 types to float32/int32."""
if tensor.dtype == mstype.float64:
return tensor.astype("float32")
if tensor.dtype == mstype.int64:
return tensor.astype("int32")
return tensor
if tensor.dtype == mstype.float64: # 如果tensor的数据类型是float64
return tensor.astype("float32") # 将其转换为float32类型
if tensor.dtype == mstype.int64: # 如果tensor的数据类型是int64
return tensor.astype("int32") # 将其转换为int32类型
return tensor # 如果不是以上两种类型则直接返回原tensor
def _to_tensor(*args, dtype=None):
def _to_tensor(*args, dtype=None): # 定义一个函数将输入的参数转换为tensor
"""Returns each input as Tensor"""
res = ()
for arg in args:
if isinstance(arg, (int, float, bool, list, tuple)):
arg = _type_convert(Tensor, arg)
if dtype is None:
arg = _convert_64_to_32(arg)
else:
arg = arg.astype(dtype)
elif not isinstance(arg, Tensor):
_raise_value_error("Expect input to be array like.")
res += (arg,)
if len(res) == 1:
return res[0]
return res
def _to_scalar(arr):
res = () # 初始化一个空元组用于存储结果
for arg in args: # 遍历每一个输入参数
if isinstance(arg, (int, float, bool, list, tuple)): # 如果参数是整数、浮点数、布尔值、列表或元组
arg = _type_convert(Tensor, arg) # 将其转换为Tensor类型
if dtype is None: # 如果没有指定dtype
arg = _convert_64_to_32(arg) # 调用_convert_64_to_32函数进行类型转换
else: # 如果指定了dtype
arg = arg.astype(dtype) # 将tensor转换为指定的dtype
elif not isinstance(arg, Tensor): # 如果参数不是Tensor类型
_raise_value_error("Expect input to be array like.") # 抛出错误,提示输入应为数组形式
res += (arg,) # 将转换后的tensor添加到结果元组中
if len(res) == 1: # 如果结果元组中只有一个元素
return res[0] # 直接返回该元素
return res # 否则返回整个元组
def _to_scalar(arr): # 定义一个函数将输入的Tensor或ndarray转换为标量值
"""Convert a scalar Tensor or ndarray to a scalar."""
if isinstance(arr, (int, float, bool)):
return arr
if isinstance(arr, Tensor):
if arr.shape:
return arr
return arr.asnumpy().item()
raise ValueError("{} are not supported.".format(type(arr)))
def _eps(x):
return _eps_net(x[(0,) * x.ndim])
def _safe_normalize(x, threshold=None):
if isinstance(arr, (int, float, bool)): # 如果输入参数是整数、浮点数或布尔值
return arr # 直接返回该参数
if isinstance(arr, Tensor): # 如果输入参数是Tensor类型
if arr.shape: # 如果tensor的形状不是空的即不是标量
return arr # 返回整个tensor
return arr.asnumpy().item() # 如果是标量将其转换为numpy数组并返回标量值
raise ValueError("{} are not supported.".format(type(arr))) # 如果输入参数不是以上两种类型,抛出错误,提示不支持该类型
def _eps(x): # 定义一个函数计算输入tensor的数值精度
return _eps_net(x[(0,) * x.ndim]) # 使用_ops.Eps操作计算数值精度x[(0,) * x.ndim]确保输入的是一个标量
def _safe_normalize(x, threshold=None): # 定义一个函数对输入的tensor进行归一化如果归一化结果非常小则设置为零
"""Normalize method that cast very small results to zero."""
x_sum2 = F.reduce_sum(F.pows(x, 2.0))
norm = F.pows(x_sum2, 1. / 2.0)
if threshold is None:
if x.dtype in (mstype.float32, mstype.float64):
# pick the first element of x to get the eps
x_sum2 = F.reduce_sum(F.pows(x, 2.0)) # 计算tensor元素平方的和
norm = F.pows(x_sum2, 1. / 2.0) # 计算上述和的平方根得到norm
if threshold is None: # 如果没有指定threshold
if x.dtype in (mstype.float32, mstype.float64): # 如果tensor的dtype是float32或float64
# pick the first element of x to get the eps # 获取eps来作为threshold
threshold = _eps(x)
else:
threshold = 0
use_norm = greater(norm, threshold)
x_norm = x / norm
normalized_x = where(use_norm, x_norm, zeros_like(x))
norm = where(use_norm, norm, zeros_like(norm))
return normalized_x, norm
def sparse_dot(a, b):
else: # 如果tensor的dtype不是float32或float64
threshold = 0 # 设置threshold为0
use_norm = greater(norm, threshold) # 比较norm和threshold得到一个布尔mask
x_norm = x / norm # 使用norm对tensor进行归一化
normalized_x = where(use_norm, x_norm, zeros_like(x)) # 如果norm大于threshold则使用归一化后的tensor否则使用零
norm = where(use_norm, norm, zeros_like(norm)) # 如果norm大于threshold则保留norm否则使用零
return normalized_x, norm # 返回归一化后的tensor及其对应的norm
def sparse_dot(a, b): # 定义一个函数计算稀疏矩阵CSRTensor与向量generic Tensor的点积
"""Returns the dot product of CSRTensor and generic Tensor(vector)."""
b_aligned = F.reshape(b, (b.shape[0], -1))
res = F.csr_mv(a, b_aligned)
res = F.reshape(res, a.shape[:-1] + b.shape[1:])
return res
b_aligned = F.reshape(b, (b.shape[0], -1)) # 将向量b重塑为(b.shape[0], -1)的形状,使其可以与稀疏矩阵相乘
res = F.csr_mv(a, b_aligned) # 使用csr_mv操作计算稀疏矩阵a与向量b_aligned的点积
res = F.reshape(res, a.shape[:-1] + b.shape[1:]) # 将计算结果重新塑形为a.shape[:-1] + b.shape[1:]的形状
return res # 返回结果
def _normalize_matvec(f):
def _normalize_matvec(f): # 定义一个函数,对输入的矩阵或向量进行归一化处理
"""Normalize an argument for computing matrix-vector products."""
if isinstance(f, Tensor):
return F.partial(dot, f)
if isinstance(f, CSRTensor):
return F.partial(sparse_dot, f)
return f
if isinstance(f, Tensor): # 如果输入参数是Tensor类型
return F.partial(dot, f) # 返回一个带有矩阵参数f的dot函数的部分应用
if isinstance(f, CSRTensor): # 如果输入参数是CSRTensor类型
return F.partial(sparse_dot, f) # 返回一个带有稀疏矩阵参数f的sparse_dot函数的部分应用
def _norm(x, ord_=None):
if ord_ == mnp.inf:
res = mnp.max(mnp.abs(x))
else:
res = mnp.sqrt(mnp.sum(x ** 2))
return res
return f # 如果输入参数不是上述两种类型,则直接返回原参数
def _norm(x, ord_=None): # 定义一个函数计算输入tensor的范数
if ord_ == mnp.inf: # 如果ord_为无穷大实际为最大值
res = mnp.max(mnp.abs(x)) # 返回tensor绝对值的最大值
else: # 如果ord_不是无穷大
res = mnp.sqrt(mnp.sum(x ** 2)) # 返回tensor元素平方和的平方根即L2范数
return res # 返回结果
def _nd_transpose(a):
dims = a.ndim
if dims < 2:
_raise_value_error("to do _nd_transpose for input a's ndim is not greater or equal to 2d, which is invalid.")
axes = ops.make_range(0, dims)
axes = axes[:-2] + (axes[-1],) + (axes[-2],)
return ops.transpose(a, axes)
def _nd_transpose(a): # 定义一个函数对输入的tensor进行转置最后一个维度与倒数第二个维度互换
dims = a.ndim # 获取tensor的维度数
if dims < 2: # 如果tensor的维度小于2
_raise_value_error("to do _nd_transpose for input a's ndim is not greater or equal to 2d, which is invalid.") # 抛出错误提示输入tensor的维度应大于等于2
axes = ops.make_range(0, dims) # 生成一个从0到tensor维度数的序列
axes = axes[:-2] + (axes[-1],) + (axes[-2],) # 将序列中的倒数第二个和最后一个元素互换位置
return ops.transpose(a, axes) # 使用transpose操作对tensor进行转置
def _value_check(func_name, arg1, arg2, arg_name='', attr_name='', op="in", fmt="attr", msg=None): # 定义一个函数,用于检查输入参数的值是否符合预期
return _super_check(pack(arg1, arg2), (func_name, arg_name, attr_name), op, fmt, msg, True) # 调用_super_check函数进行检查
def _value_check(func_name, arg1, arg2, arg_name='', attr_name='', op="in", fmt="attr", msg=None):
return _super_check(pack(arg1, arg2), (func_name, arg_name, attr_name), op, fmt, msg, True)
def _type_check(func_name, arg1, arg2, arg_name='', op="isinstance", fmt="type", msg=None): # 定义一个函数,用于检查输入参数的类型是否符合预期
return _super_check(pack(arg1, arg2), (func_name, arg_name), op, fmt, msg, False) # 调用_super_check函数进行检查
def _type_check(func_name, arg1, arg2, arg_name='', op="isinstance", fmt="type", msg=None):
return _super_check(pack(arg1, arg2), (func_name, arg_name), op, fmt, msg, False)
def _mstype_check(func_name, arg, arg_mstype, arg_name='a'):
return _super_check((F.typeof(arg), arg_mstype), pack(arg, arg_mstype, func_name, arg_name), "isinstance", "mstype",
def _mstype_check(func_name, arg, arg_mstype, arg_name='a'): # 定义一个函数用于检查输入参数的mstype是否符合预期
return _super_check((F.typeof(arg), arg_mstype), pack(arg, arg_mstype, func_name, arg_name), "isinstance", "mstype", # 调用_super_check函数进行检查
None, False)
def _dtype_check(func_name, arg, arg_dtype, arg_name='a'): # 定义一个函数,用于检查输入参数的数据类型是否符合预期
return _super_check((F.dtype(arg), arg_dtype), (func_name, arg_name, "data type"), "in", "attr", # 调用_super_check函数进行检查
None, False)
def _dtype_check(func_name, arg, arg_dtype, arg_name='a'):
return _super_check((F.dtype(arg), arg_dtype), (func_name, arg_name, "data type"), "in", "attr", None, False)
def _square_check(func_name, arg, arg_name='a'):
arg_shape = arg.shape
_super_check((len(arg_shape), 2), (func_name, arg_name, 'dimension'), '==', 'attr', None, True)
_super_check(arg_shape, (func_name, arg_name), '==', 'square', None, True)
return arg
def _solve_check(func_name, arg1, arg2, arg1_name='a', arg2_name='b', sparse=False):
arg1_shape, arg1_dtype = arg1.shape, F.dtype(arg1)
arg2_shape, arg2_dtype = arg2.shape, F.dtype(arg2)
_square_check(func_name, arg1, arg1_name)
_super_check((len(arg2_shape), (1, 2)), (func_name, arg2_name, 'dimension'), 'in', 'attr', None, True)
_super_check((arg1_shape, arg2_shape), (func_name, arg1_name, arg2_name, sparse), 'solve', 'solve', None, True)
_super_check((arg1_dtype, arg2_dtype), (func_name, arg1_name, arg2_name, 'data type'), '==', 'match', None, False)
return arg1, arg2
def _sparse_check(func_name, a, m, b, x0):
def _square_check(func_name, arg, arg_name='a'): # 定义一个函数,用于检查输入参数是否为方阵
arg_shape = arg.shape # 获取输入参数的形状
_super_check((len(arg_shape), 2), (func_name, arg_name, 'dimension'), '==', 'attr', None, True) # 检查输入参数的维度是否为2
_super_check(arg_shape, (func_name, arg_name), '==', 'square', None, True) # 检查输入参数的形状是否为方阵
return arg # 返回检查后的参数
def _solve_check(func_name, arg1, arg2, arg1_name='a', arg2_name='b', sparse=False): # 定义一个函数,用于在求解线性方程组时检查输入参数
arg1_shape, arg1_dtype = arg1.shape, F.dtype(arg1) # 获取第一个参数的形状和数据类型
arg2_shape, arg2_dtype = arg2.shape, F.dtype(arg2) # 获取第二个参数的形状和数据类型
_square_check(func_name, arg1, arg1_name) # 检查第一个参数是否为方阵
_super_check((len(arg2_shape), (1, 2)), (func_name, arg2_name, 'dimension'), 'in', 'attr', None, True) # 检查第二个参数的维度是否为1或2
_super_check((arg1_shape, arg2_shape), (func_name, arg1_name, arg2_name, sparse), 'solve', 'solve', None, True) # 检查第一个参数和第二个参数的形状是否可以用于求解线性方程组
_super_check((arg1_dtype, arg2_dtype), (func_name, arg1_name, arg2_name, 'data type'), '==', 'match', None, False) # 检查第一个参数和第二个参数的数据类型是否匹配
return arg1, arg2 # 返回检查后的两个参数
def _sparse_check(func_name, a, m, b, x0): # 定义一个函数用于在稀疏求解器如cg, bicgstab和gmres中检查输入参数
"""Used for cg, bicgstab and gmres method."""
def _check_right(arg, arg_name):
if arg is None:
return mnp.zeros_like(b) # x0 same as b
def _check_right(arg, arg_name): # 定义一个内部函数用于检查右侧参数b或x0
if arg is None: # 如果参数为None
return mnp.zeros_like(b) # x0 same as b # 返回与b形状相同元素为零的tensor
# Type
_mstype_check(func_name, arg, mstype.tensor_type, arg_name)
_mstype_check(func_name, arg, mstype.tensor_type, arg_name) # 检查参数的mstype是否为tensor_type
# DType
_dtype_check(func_name, arg, [mstype.int32, mstype.int64, mstype.float32, mstype.float64], arg_name)
_dtype_check(func_name, arg, [mstype.int32, mstype.int64, mstype.float32, mstype.float64], arg_name) # 检查参数的数据类型是否在指定的类型列表中
# Shape
if (arg.ndim != 1 and arg.ndim != 2) or (arg.ndim == 2 and arg.shape[1] != 1):
_raise_value_error("For: '", func_name, "', the shape of '", arg_name,
if (arg.ndim != 1 and arg.ndim != 2) or (arg.ndim == 2 and arg.shape[1] != 1): # 检查参数的形状是否为(N,)或(N, 1)
_raise_value_error("For: '", func_name, "', the shape of '", arg_name, # 如果不满足条件,抛出错误
"' should be like (N,) or (N, 1), bug got ", arg.shape, ".")
return arg
return arg # 返回检查后的参数
b = _check_right(b, 'b')
x0 = _check_right(x0, 'x0')
b = _check_right(b, 'b') # 检查参数b
x0 = _check_right(x0, 'x0') # 检查参数x0
def _check_left(arg, arg_name):
if arg is None:
return lambda x: x # identity function
def _check_left(arg, arg_name): # 定义一个内部函数用于检查左侧参数a或m
if arg is None: # 如果参数为None
return lambda x: x # identity function # 返回一个恒等函数
# Type
_mstype_check(func_name, arg, [mstype.function_type, mstype.tensor_type, mstype.csr_tensor_type], arg_name)
if _callable_const(F.typeof(arg)):
return arg
_mstype_check(func_name, arg, [mstype.function_type, mstype.tensor_type, mstype.csr_tensor_type], arg_name) # 检查参数的mstype是否为function_type, tensor_type或csr_tensor_type
if _callable_const(F.typeof(arg)): # 如果参数是一个可调用的常量(即函数)
return arg # 返回该参数
# DType
if isinstance(arg, CSRTensor):
_dtype_check(func_name, arg.indptr, [mstype.int32], arg_name)
_dtype_check(func_name, arg.indices, [mstype.int32], arg_name)
_dtype_check(func_name, arg.values, [mstype.float32], arg_name)
else:
_dtype_check(func_name, arg, [mstype.int32, mstype.int64, mstype.float32, mstype.float64], arg_name)
if isinstance(arg, CSRTensor): # 如果参数是CSRTensor类型
_dtype_check(func_name, arg.indptr, [mstype.int32], arg_name) # 检查CSRTensor的indptr数据类型是否为int32
_dtype_check(func_name, arg.indices, [mstype.int32], arg_name) # 检查CSRTensor的indices数据类型是否为int32
_dtype_check(func_name, arg.values, [mstype.float32], arg_name) # 检查CSRTensor的values数据类型是否为float32
else: # 如果参数不是CSRTensor类型
_dtype_check(func_name, arg, [mstype.int32, mstype.int64, mstype.float32, mstype.float64], arg_name) # 检查参数的数据类型是否在指定的类型列表中
# Shape
_solve_check(func_name, arg, b, arg_name, 'b', True)
_solve_check(func_name, arg, x0, arg_name, 'x0', True)
if isinstance(arg, Tensor) and F.dtype(arg) in (mstype.int32, mstype.int64):
arg = F.cast(arg, mstype.float64)
return arg
a = _check_left(a, 'A')
m = _check_left(m, 'M')
b = b.flatten()
x0 = x0.flatten()
if F.dtype(b) in (mstype.int32, mstype.int64):
b = F.cast(b, mstype.float64)
x0 = F.cast(x0, mstype.float64)
return a, m, b, x0
_solve_check(func_name, arg, b, arg_name, 'b', True) # 检查参数a和b的形状是否可以用于求解线性方程组
_solve_check(func_name, arg, x0, arg_name, 'x0', True) # 检查参数a和x0的形状是否可以用于求解线性方程组
if isinstance(arg, Tensor) and F.dtype(arg) in (mstype.int32, mstype.int64): # 如果参数是Tensor类型且数据类型为int32或int64
arg = F.cast(arg, mstype.float64) # 将其转换为float64类型
return arg # 返回检查后的参数
a = _check_left(a, 'A') # 检查参数a
m = _check_left(m, 'M') # 检查参数m
b = b.flatten() # 将参数b展平为一维的tensor
x0 = x0.flatten() # 将参数x0展平为一维的tensor
if F.dtype(b) in (mstype.int32, mstype.int64): # 如果参数b的数据类型为int32或int64
b = F.cast(b, mstype.float64) # 将其转换为float64类型
x0 = F.cast(x0, mstype.float64) # 将其转换为float64类型
return a, m, b, x0 # 返回检查并转换后的参数

@ -366,31 +366,46 @@ class ModelCheckpoint(Callback):
"""
def __init__(self, prefix='CKP', directory=None, config=None):
# 初始化函数,设置前缀、目录、配置等参数
super(ModelCheckpoint, self).__init__()
# 调用父类的初始化函数
self._latest_ckpt_file_name = ""
# 初始化最新检查点文件名为空字符串
self._init_time = time.time()
# 初始化初始化时间为当前时间
self._last_time = time.time()
# 初始化最后时间时间为当前时间
self._last_time_for_keep = time.time()
# 初始化最后保存时间为当前时间
self._last_triggered_step = 0
# 初始化最后触发的步数为0
# 检查前缀是否为字符串且不包含'/'
if not isinstance(prefix, str) or prefix.find('/') >= 0:
raise ValueError("For 'ModelCheckpoint', the argument 'prefix' "
"for checkpoint file name is invalid, it must be "
"string and does not contain '/', but got {}.".format(prefix))
self._prefix = prefix
# 设置前缀
self._exception_prefix = prefix
# 设置异常前缀
# 如果目录不为空,则创建目录
if directory is not None:
self._directory = _make_directory(directory)
else:
self._directory = _cur_dir
# 否则,使用当前目录
# 如果启用了恢复上下文,则设置检查点路径
if _get_recovery_context("enable_recovery"):
_set_recovery_context(ckpt_path=self._directory)
# 如果config为None则使用默认的CheckpointConfig
if config is None:
self._config = CheckpointConfig()
else:
# 如果config不是CheckpointConfig类型则抛出TypeError异常
if not isinstance(config, CheckpointConfig):
raise TypeError("For 'ModelCheckpoint', the type of argument 'config' should be "
"'CheckpointConfig', "
@ -398,11 +413,17 @@ class ModelCheckpoint(Callback):
self._config = config
# get existing checkpoint files
# 创建CheckpointManager对象
self._manager = CheckpointManager()
# 如果存在相同名称的文件,则更改文件名
self._prefix = _chg_ckpt_file_name_if_same_exist(self._directory, self._prefix)
# 获取配置中的append_dict参数如果没有则设置为空字典
self._append_dict = self._config.append_dict or {}
# 获取append_dict中的epoch_num参数如果没有则设置为0
self._append_epoch_num = self._append_dict["epoch_num"] if "epoch_num" in self._append_dict else 0
# 获取append_dict中的step_num参数如果没有则设置为0
self._append_step_num = self._append_dict["step_num"] if "step_num" in self._append_dict else 0
# 标记是否已经保存了图
self._graph_saved = False
self._need_flush_from_cache = True
@ -413,6 +434,7 @@ class ModelCheckpoint(Callback):
Args:
run_context (RunContext): Context of the train running.
"""
# If the role is PServer, add the role name and rank to the prefix
if _is_role_pserver():
self._prefix = "PServer_" + str(_get_ps_mode_rank()) + "_" + self._prefix
cb_params = run_context.original_args()
@ -423,18 +445,23 @@ class ModelCheckpoint(Callback):
self._last_triggered_step = cb_params.last_save_ckpt_step
cb_params.last_save_ckpt_step = None
# Create the directory if it doesn't exist
_make_directory(self._directory)
# save graph (only once)
if not self._graph_saved:
graph_file_name = os.path.join(self._directory, self._prefix + '-graph.meta')
# If the graph file already exists and the mode is GRAPH_MODE, remove it
if os.path.isfile(graph_file_name) and context.get_context("mode") == context.GRAPH_MODE:
os.remove(graph_file_name)
# Save the graph
_save_graph(cb_params.train_network, graph_file_name)
self._graph_saved = True
# Wait for any asynchronous checkpoint saving threads to finish
thread_list = threading.enumerate()
for thread in thread_list:
if thread.getName() == "asyn_save_ckpt":
thread.join()
# Save the checkpoint
self._save_ckpt(cb_params)
def end(self, run_context):
@ -444,44 +471,63 @@ class ModelCheckpoint(Callback):
Args:
run_context (RunContext): Context of the train running.
"""
# 获取训练的参数
cb_params = run_context.original_args()
# 设置保存最后一个checkpoint的标志为True
_to_save_last_ckpt = True
# 保存最后一个checkpoint
self._save_ckpt(cb_params, _to_save_last_ckpt)
# 获取当前线程列表
thread_list = threading.enumerate()
# 遍历线程列表
for thread in thread_list:
# 如果线程名为"asyn_save_ckpt",则等待该线程结束
if thread.getName() == "asyn_save_ckpt":
thread.join()
# 销毁所有gather cell
destroy_allgather_cell()
def _check_save_ckpt(self, cb_params, force_to_save):
"""Check whether save checkpoint files or not."""
# 如果配置了保存检查点步数且步数大于0
if self._config.save_checkpoint_steps and self._config.save_checkpoint_steps > 0:
# 如果当前步数大于等于上次触发保存检查点步数加上保存检查点步数,或者强制保存检查点
if cb_params.cur_step_num >= self._last_triggered_step + self._config.save_checkpoint_steps \
or force_to_save is True:
return True
# 如果配置了保存检查点秒数且秒数大于0
elif self._config.save_checkpoint_seconds and self._config.save_checkpoint_seconds > 0:
# 获取当前时间
self._cur_time = time.time()
# 如果当前时间减去上次时间大于保存检查点秒数,或者强制保存检查点
if (self._cur_time - self._last_time) > self._config.save_checkpoint_seconds or force_to_save:
# 更新上次时间
self._last_time = self._cur_time
return True
# 返回False
return False
def _save_ckpt(self, cb_params, force_to_save=False):
"""Save checkpoint files."""
# 如果当前步骤数等于最后触发的步骤数,则返回
if cb_params.cur_step_num == self._last_triggered_step:
return
# if param is cache enable, flush data from cache to host before save_ckpt
# 如果需要从缓存中刷新数据则调用_flush_from_cache方法
if self._need_flush_from_cache:
self._flush_from_cache(cb_params)
# 检查是否需要保存检查点如果force_to_save为True则强制保存
save_ckpt = self._check_save_ckpt(cb_params, force_to_save)
# 计算当前步数在epoch中的位置
step_num_in_epoch = int((cb_params.cur_step_num - 1) % cb_params.batch_num + 1)
# 如果需要保存检查点,则创建当前检查点的文件名
if save_ckpt:
cur_ckpoint_file = self._prefix + "-" + str(cb_params.cur_epoch_num) + "_" \
+ str(step_num_in_epoch) + ".ckpt"
@ -489,43 +535,68 @@ class ModelCheckpoint(Callback):
self._manager.update_ckpoint_filelist(self._directory, self._prefix)
# keep checkpoint files number equal max number.
if self._config.keep_checkpoint_max and 0 < self._config.keep_checkpoint_max <= self._manager.ckpoint_num:
# 如果keep_checkpoint_max配置存在且大于0且小于等于当前checkpoint文件数量则删除最旧的checkpoint文件
self._manager.remove_oldest_ckpoint_file()
elif self._config.keep_checkpoint_per_n_minutes and self._config.keep_checkpoint_per_n_minutes > 0:
# 如果keep_checkpoint_per_n_minutes配置存在且大于0则记录当前时间
self._cur_time_for_keep = time.time()
# 如果当前时间与上次记录的时间之差小于keep_checkpoint_per_n_minutes配置的分钟数乘以60则保留每个分钟的一个checkpoint文件
if (self._cur_time_for_keep - self._last_time_for_keep) \
< self._config.keep_checkpoint_per_n_minutes * 60:
self._manager.keep_one_ckpoint_per_minutes(self._config.keep_checkpoint_per_n_minutes,
self._cur_time_for_keep)
# generate the new checkpoint file and rename it.
# 定义全局变量_save_dir并将其赋值为self._directory
global _save_dir
_save_dir = self._directory
# 获取当前checkpoint文件的路径
cur_file = os.path.join(self._directory, cur_ckpoint_file)
# 记录当前时间
self._last_time_for_keep = time.time()
# 记录当前触发步数
self._last_triggered_step = cb_params.cur_step_num
# 如果启用了GEGraph Execution
if context.get_context("enable_ge"):
# 设置当前网络
set_cur_net(cb_params.train_network)
# 执行checkpoint图
cb_params.train_network.exec_checkpoint_graph()
# 如果_append_dict中包含"epoch_num"
if "epoch_num" in self._append_dict:
# 将_append_epoch_num加上当前epoch数赋值给"epoch_num"
self._append_dict["epoch_num"] = self._append_epoch_num + cb_params.cur_epoch_num
# 如果_append_dict中包含"step_num"
if "step_num" in self._append_dict:
# 将_append_step_num加上当前step数赋值给"step_num"
self._append_dict["step_num"] = self._append_step_num + cb_params.cur_step_num
# 获取保存的网络如果self._config.saved_network不为None则使用self._config.saved_network否则使用cb_params.train_network
network = self._config.saved_network if self._config.saved_network is not None else cb_params.train_network
# 保存checkpoint
save_checkpoint(network, cur_file, self._config.integrated_save, self._config.async_save,
self._append_dict, self._config.enc_key, self._config.enc_mode)
# 记录最新的checkpoint文件名
self._latest_ckpt_file_name = cur_file
def _flush_from_cache(self, cb_params):
"""Flush cache data to host if tensor is cache enable."""
# 初始化has_cache_params为False
has_cache_params = False
# 获取训练网络中的参数
params = cb_params.train_network.get_parameters()
# 遍历参数
for param in params:
# 如果参数的cache_enable为True
if param.cache_enable:
# 设置has_cache_params为True
has_cache_params = True
# 将参数的Tensor数据从缓存中刷新到主机
Tensor(param).flush_from_cache()
# 如果没有参数的cache_enable为True
if not has_cache_params:
# 设置_need_flush_from_cache为False
self._need_flush_from_cache = False
@property
@ -535,63 +606,88 @@ class ModelCheckpoint(Callback):
class CheckpointManager:
"""Manage checkpoint files according to train_config of checkpoint."""
"""管理检查点文件,根据训练配置进行管理。"""
def __init__(self):
"""初始化检查点管理器,创建空的检查点文件列表。"""
self._ckpoint_filelist = []
@property
def ckpoint_filelist(self):
"""Get all the related checkpoint files managed here."""
"""获取当前管理的所有检查点文件列表。"""
return self._ckpoint_filelist
@property
def ckpoint_num(self):
"""Get the number of the related checkpoint files managed here."""
"""获取当前管理的检查点文件数量。"""
return len(self._ckpoint_filelist)
def update_ckpoint_filelist(self, directory, prefix):
"""Update the checkpoint file list."""
"""更新检查点文件列表,根据目录和前缀筛选符合条件的检查点文件。"""
# 初始化一个空列表用于存储ckpt文件
self._ckpoint_filelist = []
# 获取指定目录下的所有文件
files = os.listdir(directory)
# 遍历所有文件
for filename in files:
# 判断文件是否以指定前缀开头,并且以.ckpt结尾
if os.path.splitext(filename)[-1] == ".ckpt" and filename.startswith(prefix + "-"):
# 获取文件名中间部分
mid_name = filename[len(prefix):-5]
# 判断中间部分是否包含字母
flag = not (True in [char.isalpha() for char in mid_name])
# 如果不包含字母,则将文件路径添加到列表中
if flag:
self._ckpoint_filelist.append(os.path.join(directory, filename))
def remove_ckpoint_file(self, file_name):
"""Remove the specified checkpoint file from this checkpoint manager and also from the directory."""
"""从检查点管理器中移除指定的检查点文件,并从目录中删除该文件。"""
try:
# 修改文件权限为可写
os.chmod(file_name, stat.S_IWRITE)
# 删除文件
os.remove(file_name)
# 从ckpoint文件列表中移除该文件
self._ckpoint_filelist.remove(file_name)
except OSError:
# 捕获OSError异常并记录警告日志
logger.warning("OSError, failed to remove the older ckpt file %s.", file_name)
except ValueError:
# 捕获ValueError异常并记录警告日志
logger.warning("ValueError, failed to remove the older ckpt file %s.", file_name)
def remove_oldest_ckpoint_file(self):
"""Remove the oldest checkpoint file from this checkpoint manager and also from the directory."""
"""移除检查点管理器中最早的检查点文件,并从目录中删除该文件。"""
# 获取所有checkpoint文件并按修改时间排序
ckpoint_files = sorted(self._ckpoint_filelist, key=os.path.getmtime)
# 删除最早修改的checkpoint文件
self.remove_ckpoint_file(ckpoint_files[0])
def keep_one_ckpoint_per_minutes(self, minutes, cur_time):
"""Only keep the latest one ckpt file per minutes, remove other files generated in [last_time, cur_time]."""
"""保留每分钟生成的最新检查点文件,移除在指定时间范围内生成的其他文件。"""
# 定义一个空列表,用于存储需要删除的文件
del_list = []
# 定义一个空字符串,用于存储最旧的文件名
oldest_file = ''
# 定义一个变量,用于存储当前时间
oldest_time = cur_time
# 遍历_ckpoint_filelist中的文件
for ck_file in self._ckpoint_filelist:
# 获取文件的修改时间
modify_time = os.path.getmtime(ck_file)
# 如果当前时间减去文件的修改时间小于60*minutes则将文件添加到del_list中
if cur_time - modify_time < 60 * minutes:
del_list.append(ck_file)
# 如果文件的修改时间小于oldest_time则更新oldest_time和oldest_file
if modify_time < oldest_time:
oldest_time = modify_time
oldest_file = ck_file
# 遍历del_list中的文件
for mv_file in del_list:
# 如果文件是最旧的文件,则跳过
if mv_file == oldest_file:
continue
self.remove_ckpoint_file(mv_file)
# 调用remove_ckpoint_file方法删除文件
self.remove_ckpoint_file(mv_file)

@ -256,36 +256,53 @@ class DatasetHelper:
"""
def __init__(self, dataset, dataset_sink_mode=True, sink_size=-1, epoch_num=1):
# 检查dataset_sink_mode是否为布尔值
dataset_sink_mode = Validator.check_bool(dataset_sink_mode)
# 检查sink_size是否为整数
Validator.check_is_int(sink_size)
# 如果sink_size小于-1或者等于0抛出异常
if sink_size < -1 or sink_size == 0:
raise ValueError("The 'sink_size' must be -1 or positive, but got sink_size {}.".format(sink_size))
# 如果sink_size等于-1则将其设置为dataset的dataset_size
if sink_size == -1:
sink_size = dataset.get_dataset_size()
# 如果dataset_sink_mode为True则根据不同的设备类型选择不同的迭代器
if dataset_sink_mode:
# 如果启用了GE则使用GE的迭代器
if context.get_context("enable_ge"):
iterclass = _DatasetIterGE
else:
# 如果当前模式为GRAPH_MODE则根据角色选择不同的迭代器
if context.get_context("mode") == context.GRAPH_MODE:
# 如果当前角色为调度器或者参数服务器,则使用参数服务器的迭代器
if _is_role_sched() or _is_role_pserver():
iterclass = _DatasetIterPSServer
# 如果当前角色为工作节点并且是参数服务器模式,则使用参数服务器工作节点的迭代器
elif _is_role_worker() and _is_ps_mode():
iterclass = _DatasetIterPSWork
# 如果当前设备类型为Ascend或者GPU则使用多线程循环的迭代器
elif (context.get_context("device_target") == "Ascend") or \
(context.get_context("device_target") == "GPU"):
iterclass = _DatasetIterMSLoopSink
# 如果当前设备类型为CPU则抛出异常因为CPU不支持数据集下沉模式
elif context.get_context("device_target") == "CPU":
raise RuntimeError("Currently dataset sink mode is not supported when the device "
"target is CPU, please set dataset sink mode to False.")
# 如果当前模式不是GRAPH_MODE则使用PyNative的迭代器
else:
iterclass = _DatasetIterPyNative
# 创建迭代器
self.iter = iterclass(dataset, sink_size, epoch_num)
# 如果dataset_sink_mode为False则使用普通的迭代器
else:
# 如果不是分布式训练则使用_DatasetIterNormal类
iterclass = _DatasetIterNormal
# 初始化迭代器
self.iter = iterclass(dataset, epoch_num=epoch_num)
def __iter__(self):
# 返回self.iter的迭代器
return self.iter.__iter__()
# A temp solution for loop sink. Delete later
@ -301,6 +318,7 @@ class DatasetHelper:
>>>
>>> types, shapes = dataset_helper.types_shapes()
"""
# 从当前配置的dataset中获取类型和形状
return self.iter.types_shapes()
def sink_size(self):
@ -316,18 +334,22 @@ class DatasetHelper:
>>> # if sink_size==-1, then will return the full size of source dataset.
>>> sink_size = dataset_helper.sink_size()
"""
# 返回迭代器的接收缓冲区大小
return self.iter.get_sink_size()
def stop_send(self):
"""Stop send data about data sink."""
# 停止发送关于数据接收器的数据
self.iter.stop_send()
def release(self):
"""Free up resources about data sink."""
# 释放数据接收器的资源
self.iter.release()
def continue_send(self):
"""Continue to send data to device at the beginning of epoch."""
# 在每个epoch的开始处继续向设备发送数据
self.iter.continue_send()
def _reset(self, step):
@ -339,6 +361,7 @@ class DatasetHelper:
In sink mode, it returns the types and shapes of the current data.
Generally, it works in dynamic shape scenarios.
"""
# 返回迭代器的数据信息
return self.iter.get_data_info()
def dynamic_min_max_shapes(self):
@ -355,6 +378,7 @@ class DatasetHelper:
>>>
>>> min_shapes, max_shapes = dataset_helper.dynamic_min_max_shapes()
"""
# 返回self.iter的dynamic_min_max_shapes方法
return self.iter.dynamic_min_max_shapes()
@ -362,20 +386,27 @@ class _DatasetIter:
"""Base iter for dataset helper"""
def __init__(self, dataset, sink_size, epoch_num):
# 初始化函数传入数据集、sink大小和epoch数量
self.dataset = dataset
self.sink_size = sink_size
self.sink_count = self.get_sink_count(dataset)
# 如果数据集没有__transfer_dataset__属性
if not hasattr(dataset, '__transfer_dataset__'):
# 如果数据集有__loop_size__属性
if hasattr(dataset, '__loop_size__'):
# PS mode does not support loop sink and need get the real sink size.
# 如果不是worker角色或者不是ps模式则设置sink_size为dataset的循环大小
if not (_is_role_worker() and _is_ps_mode()):
self.sink_size = dataset.__loop_size__
# 如果sink_size为1sink_count为1dataset的大小不为1并且设备目标为Ascend则创建数据信息队列
create_data_info_queue = (sink_size == 1 and self.sink_count == 1 and dataset.get_dataset_size() != 1
and context.get_context("device_target") == "Ascend")
# 执行数据图并将sink_size和create_data_info_queue作为参数传入
dataset.__transfer_dataset__ = _exec_datagraph(dataset, self.sink_size,
create_data_info_queue=create_data_info_queue)
# 如果dataset没有__no_send__属性则发送数据
if not hasattr(dataset, '__no_send__'):
_send_data(dataset, epoch_num)
else:
@ -384,33 +415,48 @@ class _DatasetIter:
_cell_graph_executor.set_queue_name(dataset.__transfer_dataset__.queue_name)
_send_data_no_flag(dataset, epoch_num)
# 获取dataset的stop_send方法
self.stop_send = dataset.__transfer_dataset__.stop_send
# 获取dataset的release方法
self.release = dataset.__transfer_dataset__.release
# 获取dataset的continue_send方法
self.continue_send = dataset.__transfer_dataset__.continue_send
# 获取dataset的get_data_info方法
self.get_data_info = dataset.__transfer_dataset__.get_data_info
# 获取dataset的dynamic_min_max_shapes属性
self.dynamic_min_max_shapes = dataset.dynamic_min_max_shapes
# 获取dataset的数据类型和数据形状
self.dataset_types, self.dataset_shapes = _get_types_and_shapes(dataset)
# 如果dataset的__transfer_dataset__属性中有_reset方法则获取该_reset方法
if hasattr(dataset.__transfer_dataset__, "_reset"):
self._reset = dataset.__transfer_dataset__._reset # pylint: disable=W0212
def __iter__(self):
# 初始化索引为0
self.index = 0
# 返回self
return self
# 迭代器的下一项
def __next__(self):
# 如果索引大于等于sink_count抛出StopIteration异常
if self.index >= self.sink_count:
raise StopIteration()
# 索引加1
self.index += 1
# 返回op()的返回值
return self.op()
def types_shapes(self):
"""
Return the types and shapes of the dataset. The type and shape of each data in the dataset
should be consistent.
返回数据集的类型和形状数据集中每个数据的类型和形状应该是一致的
"""
return self.dataset_types, self.dataset_shapes
def get_sink_count(self, dataset):
"""
获取数据集的sink次数
:param dataset: 数据集对象
:return: sink次数
"""
sink_count = 1
if hasattr(dataset, '__loop_size__'):
loop_size = dataset.__loop_size__
@ -421,7 +467,10 @@ class _DatasetIter:
return sink_count
def get_sink_size(self):
"""get sink_size to device"""
"""
获取设备的sink大小
:return: sink大小
"""
sink_size = 1
if hasattr(self.dataset, '__loop_size__'):
sink_size = self.dataset.__loop_size__

@ -23,47 +23,59 @@ from setuptools import setup, find_packages
from setuptools.command.egg_info import egg_info
from setuptools.command.build_py import build_py
# 获取环境变量
backend_policy = os.getenv('BACKEND_POLICY')
device_target = os.getenv('BACKEND_TARGET')
commit_id = os.getenv('COMMIT_ID').replace("\n", "")
package_name = os.getenv('MS_PACKAGE_NAME').replace("\n", "")
build_path = os.getenv('BUILD_PATH')
# 获取当前文件路径
pwd = os.path.dirname(os.path.realpath(__file__))
# 获取包目录路径
pkg_dir = os.path.join(build_path, 'package')
def _read_file(filename):
"""读取文件内容"""
with open(os.path.join(pwd, filename), encoding='UTF-8') as f:
return f.read()
# 读取版本号
version = _read_file('version.txt').replace("\n", "")
# 读取README.md文件内容
readme = _read_file('README.md')
def _write_version(file):
"""写入版本号"""
file.write("__version__ = '{}'\n".format(version))
def _write_config(file):
"""写入后端策略"""
file.write("__backend__ = '{}'\n".format(backend_policy))
def _write_commit_file(file):
"""写入commit_id"""
file.write("__commit_id__ = '{}'\n".format(commit_id))
def _write_package_name(file):
"""写入包名"""
file.write("__package_name__ = '{}'\n".format(package_name))
def _write_device_target(file):
"""写入设备目标"""
file.write("__device_target__ = '{}'\n".format(device_target))
def build_dependencies():
"""generate python file"""
# 生成version.py文件
version_file = os.path.join(pkg_dir, 'mindspore', 'version.py')
with open(version_file, 'w') as f:
_write_version(f)
@ -72,6 +84,7 @@ def build_dependencies():
with open(version_file, 'w') as f:
_write_version(f)
# 生成default_config.py文件
config_file = os.path.join(pkg_dir, 'mindspore', 'default_config.py')
with open(config_file, 'w') as f:
_write_config(f)
@ -80,6 +93,7 @@ def build_dependencies():
with open(config_file, 'w') as f:
_write_config(f)
# 向default_config.py文件中追加device_target
target = os.path.join(pkg_dir, 'mindspore', 'default_config.py')
with open(target, 'a') as f:
_write_device_target(f)
@ -88,6 +102,7 @@ def build_dependencies():
with open(target, 'a') as f:
_write_device_target(f)
# 向default_config.py文件中追加package_name
package_info = os.path.join(pkg_dir, 'mindspore', 'default_config.py')
with open(package_info, 'a') as f:
_write_package_name(f)
@ -96,6 +111,7 @@ def build_dependencies():
with open(package_info, 'a') as f:
_write_package_name(f)
# 生成.commit_id文件
commit_file = os.path.join(pkg_dir, 'mindspore', '.commit_id')
with open(commit_file, 'w') as f:
_write_commit_file(f)
@ -145,16 +161,24 @@ def update_permissions(path):
Args:
path (str): Target directory path.
"""
# 判断操作系统是否为Windows
if platform.system() == "Windows":
return
# 遍历目标目录下的所有文件和文件夹
for dirpath, dirnames, filenames in os.walk(path):
# 遍历文件夹
for dirname in dirnames:
# 获取文件夹的完整路径
dir_fullpath = os.path.join(dirpath, dirname)
# 更新文件夹的权限
os.chmod(dir_fullpath, stat.S_IREAD | stat.S_IWRITE |
stat.S_IEXEC | stat.S_IRGRP | stat.S_IXGRP)
# 遍历文件
for filename in filenames:
# 获取文件的完整路径
file_fullpath = os.path.join(dirpath, filename)
# 更新文件的权限
os.chmod(file_fullpath, stat.S_IREAD)
@ -163,7 +187,9 @@ class EggInfo(egg_info):
def run(self):
super().run()
# 获取egg-info目录的路径
egg_info_dir = os.path.join(pkg_dir, 'mindspore.egg-info')
# 更新egg-info目录的权限
update_permissions(egg_info_dir)
@ -172,41 +198,64 @@ class BuildPy(build_py):
def run(self):
super().run()
# 获取build目录下的lib/mindspore目录的路径
mindspore_dir = os.path.join(pkg_dir, 'build', 'lib', 'mindspore')
# 更新lib/mindspore目录的权限
update_permissions(mindspore_dir)
# 获取build目录下的lib/mindspore/_akg目录的路径
mindspore_dir = os.path.join(pkg_dir, 'build', 'lib', 'mindspore', '_akg')
# 更新lib/mindspore/_akg目录的权限
update_permissions(mindspore_dir)
# 设置包的名称
setup(
name=package_name,
# 设置包的版本
version=version,
# 设置包的作者
author='The MindSpore Authors',
# 设置包的作者邮箱
author_email='contact@mindspore.cn',
# 设置包的网址
url='https://www.mindspore.cn',
# 设置包的下载网址
download_url='https://github.com/mindspore-ai/mindspore/tags',
# 设置包的源代码网址
project_urls={
'Sources': 'https://github.com/mindspore-ai/mindspore',
# 设置包的问题追踪网址
'Issue Tracker': 'https://github.com/mindspore-ai/mindspore/issues',
},
# 设置包的描述
description='MindSpore is a new open source deep learning training/inference '
'framework that could be used for mobile, edge and cloud scenarios.',
# 读取readme文件作为包的详细描述
long_description=readme,
# 设置详细描述的格式
long_description_content_type="text/markdown",
# 查找包中的所有模块
packages=find_packages(),
# 设置包的数据
package_data=package_data,
# 包含包中的所有数据
include_package_data=True,
# 设置自定义的命令类
cmdclass={
'egg_info': EggInfo,
'build_py': BuildPy,
},
# 设置包的入口点
entry_points={
'console_scripts': [
'cache_admin=mindspore.dataset.engine.cache_admin:main',
],
},
# 设置包的Python版本要求
python_requires='>=3.7',
# 设置包的依赖
install_requires=required_package,
# 设置包的分类器
classifiers=[
'Development Status :: 4 - Beta',
'Environment :: Console',
@ -223,6 +272,8 @@ setup(
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules',
],
# 设置包的许可证
license='Apache 2.0',
# 设置包的关键词
keywords='mindspore machine learning',
)

Loading…
Cancel
Save