You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1885 lines
68 KiB

5 months ago
"""Functions to verify exported ONNX model is functionally equivalent to original PyTorch model.
ONNX Runtime is required, and is used as the ONNX backend for export verification.
"""
from __future__ import annotations
import contextlib
import copy
import dataclasses
import datetime
import difflib
import enum
import functools
import io
import itertools
import os
import tempfile
import warnings
from typing import (
Any,
Callable,
Collection,
Dict,
FrozenSet,
List,
Mapping,
Optional,
Sequence,
Set,
Tuple,
Union,
)
import numpy as np
import torch
import torch._C._onnx as _C_onnx
from torch import _C
from torch.onnx import _constants, _experimental, _exporter_states, utils
from torch.onnx._globals import GLOBALS
from torch.onnx._internal import _beartype, onnx_proto_utils
from torch.types import Number
_ORT_PROVIDERS = ("CPUExecutionProvider",)
_NumericType = Union[Number, torch.Tensor, np.ndarray]
_ModelType = Union[torch.nn.Module, torch.jit.ScriptModule]
_InputArgsType = Union[torch.Tensor, Tuple[Any, ...]]
_InputKwargsType = Mapping[str, Any]
_OutputsType = Union[Sequence[_NumericType], Sequence]
class OnnxBackend(enum.Enum):
"""Enum class for ONNX backend used for export verification."""
REFERENCE = "ONNXReferenceEvaluator"
ONNX_RUNTIME_CPU = "CPUExecutionProvider"
ONNX_RUNTIME_CUDA = "CUDAExecutionProvider"
@dataclasses.dataclass
class VerificationOptions:
"""Options for ONNX export verification.
Attributes:
flatten: If True, unpack nested list/tuple/dict inputs into a flattened list of
Tensors for ONNX. Set this to False if nested structures are to be preserved
for ONNX, which is usually the case with exporting ScriptModules. Default True.
ignore_none: Whether to ignore None type in torch output, which is usually the
case with tracing. Set this to False, if torch output should keep None type,
which is usually the case with exporting ScriptModules. Default to True.
check_shape: Whether to check the shapes between PyTorch and ONNX Runtime outputs
are exactly the same. Set this to False to allow output shape broadcasting.
Default to True.
check_dtype: Whether to check the dtypes between PyTorch and ONNX Runtime outputs
are consistent. Default to True.
backend: ONNX backend for verification. Default to OnnxBackend.ONNX_RUNTIME_CPU.
rtol: relative tolerance in comparison between ONNX and PyTorch outputs.
atol: absolute tolerance in comparison between ONNX and PyTorch outputs.
remained_onnx_input_idx: If provided, only the specified inputs will be passed
to the ONNX model. Supply a list when there are unused inputs in the model.
Since unused inputs will be removed in the exported ONNX model, supplying
all inputs will cause an error on unexpected inputs. This parameter tells
the verifier which inputs to pass into the ONNX model.
acceptable_error_percentage: acceptable percentage of element mismatches in comparison.
It should be a float of value between 0.0 and 1.0.
"""
flatten: bool = True
ignore_none: bool = True
check_shape: bool = True
check_dtype: bool = True
backend: OnnxBackend = OnnxBackend.ONNX_RUNTIME_CPU
rtol: float = 1e-3
atol: float = 1e-7
remained_onnx_input_idx: Optional[Sequence[int]] = None
acceptable_error_percentage: Optional[float] = None
@_beartype.beartype
def _flatten_tuples(elem):
flattened = []
for t in elem:
if isinstance(t, tuple):
flattened.extend(_flatten_tuples(t))
else:
flattened.append(t)
return flattened
# TODO(justinchuby): Add type checking by narrowing down the return type when input is None
def _to_numpy(elem) -> Union[list, np.ndarray]:
if isinstance(elem, torch.Tensor):
if elem.requires_grad:
return elem.detach().cpu().numpy()
else:
return elem.cpu().numpy()
elif isinstance(elem, (list, tuple)):
return [_to_numpy(inp) for inp in elem]
elif isinstance(elem, (bool, int, float)):
return np.array(elem)
elif isinstance(elem, dict):
flattened = []
for k in elem:
flattened.extend([_to_numpy(k), _to_numpy(elem[k])])
return flattened
return elem
@_beartype.beartype
def _inline_flatten_list(inputs, res_list) -> list:
for i in inputs:
res_list.append(i) if not isinstance(
i, (list, tuple)
) else _inline_flatten_list(i, res_list)
return res_list
@_beartype.beartype
def _unpack_to_numpy(values, cast_onnx_accepted=True) -> list:
value_unpacked = []
for value in values:
value_unpacked.extend(
utils.unpack_quantized_tensor(value, cast_onnx_accepted=cast_onnx_accepted)
)
return [_to_numpy(v) for v in value_unpacked]
@_beartype.beartype
def _run_onnx(onnx_session, inputs) -> _OutputsType:
kw_inputs = {}
if inputs and isinstance(inputs[-1], dict):
kw_inputs = inputs[-1]
inputs = inputs[:-1]
inputs = _unpack_to_numpy(_flatten_tuples(inputs))
ort_inputs = {}
for input_name, input in kw_inputs.items():
ort_inputs[input_name] = _to_numpy(input)
inputs = _to_numpy(inputs)
if hasattr(onnx_session, "get_inputs"):
# onnxruntime.InferenceSession
input_names = [i.name for i in onnx_session.get_inputs()]
elif hasattr(onnx_session, "input_names"):
# onnx.reference.ReferenceEvaluator
input_names = onnx_session.input_names
else:
raise ValueError(f"Unknown ONNX backend type: {type(onnx_session)}.")
for i, input in enumerate(inputs):
if i == len(input_names) or input_names[i] in ort_inputs:
raise ValueError(
f"got too many positional inputs. inputs: {inputs}. kw_inputs: {kw_inputs}. "
f"input names: {input_names}."
)
ort_inputs[input_names[i]] = input
onnx_outs = onnx_session.run(None, ort_inputs)
return onnx_outs
@_beartype.beartype
def _ort_session(
model: Union[str, io.BytesIO], ort_providers: Sequence[str] = _ORT_PROVIDERS
):
try:
import onnxruntime # type: ignore[import]
except ImportError as e:
raise ImportError("onnxruntime is required for export verification.") from e
if ort_providers is None:
ort_providers = _ORT_PROVIDERS
session_options = onnxruntime.SessionOptions()
# suppress ort warnings.
# 0:Verbose, 1:Info, 2:Warning. 3:Error, 4:Fatal. Default is 2.
session_options.log_severity_level = 3
ort_session = onnxruntime.InferenceSession(
model if isinstance(model, str) else model.getvalue(),
session_options,
providers=ort_providers,
)
return ort_session
@_beartype.beartype
def _onnx_reference_evaluator_session(model: Union[str, io.BytesIO]):
try:
import onnx
from onnx import reference as onnx_reference # type: ignore[attr-defined]
except ImportError as exc:
raise ImportError("onnx >= 1.13 is required for reference evaluator.") from exc
proto = (
onnx.load(model) # type: ignore[attr-defined]
if isinstance(model, str)
else onnx.load_model_from_string(model.getvalue()) # type: ignore[attr-defined]
)
onnx_session = onnx_reference.ReferenceEvaluator(proto)
return onnx_session
@_beartype.beartype
def _onnx_backend_session(model: Union[str, io.BytesIO], backend: OnnxBackend):
if backend == OnnxBackend.REFERENCE:
onnx_session = _onnx_reference_evaluator_session(model)
elif backend in {OnnxBackend.ONNX_RUNTIME_CPU, OnnxBackend.ONNX_RUNTIME_CUDA}:
onnx_session = _ort_session(model, (backend.value,))
else:
raise ValueError(f"Unsupported backend: {backend}")
return onnx_session
@_beartype.beartype
def _compare_onnx_pytorch_outputs_in_np(
onnx_outs: _OutputsType,
pt_outs: _OutputsType,
options: VerificationOptions,
):
assert len(onnx_outs) == len(
pt_outs
), f"Number of outputs differ ONNX runtime: ({len(onnx_outs)}) PyTorch: ({len(pt_outs)})"
acceptable_error_percentage = options.acceptable_error_percentage
if acceptable_error_percentage and (
acceptable_error_percentage > 1.0 or acceptable_error_percentage < 0.0
):
raise ValueError(
"If set, acceptable_error_percentage should be between 0.0 and 1.0"
)
for ort_out, pt_out in zip(onnx_outs, pt_outs):
try:
# TODO: Remove `check_shape` option once every shape inconsistent issue is addressed.
if not options.check_shape:
# Allow different but broadcastable output shapes.
ort_out, pt_out = np.broadcast_arrays(ort_out, pt_out)
torch.testing.assert_close(
ort_out,
pt_out,
rtol=options.rtol,
atol=options.atol,
check_dtype=options.check_dtype,
equal_nan=True,
)
except AssertionError as e:
if acceptable_error_percentage:
error_percentage = 1 - np.sum(
np.isclose(ort_out, pt_out, rtol=options.rtol, atol=options.atol)
) / np.prod(ort_out.shape)
if error_percentage <= acceptable_error_percentage:
warnings.warn(
f"Suppressed AssertionError:\n{e}.\n"
f"Error percentage {error_percentage} "
f"within acceptable range {acceptable_error_percentage}."
)
continue
if ort_out.dtype == np.uint8 or ort_out.dtype == np.int8:
warnings.warn("ONNX output is quantized")
if pt_out.dtype == np.uint8 or pt_out.dtype == np.int8:
warnings.warn("PyTorch output is quantized")
raise
@_beartype.beartype
def _compare_onnx_pytorch_outputs(
onnx_outs: _OutputsType,
pt_outs: Any,
options: VerificationOptions,
):
"""
Compare ONNX and PyTorch outputs.
Args:
onnx_outs: outputs from ONNX backend.
pt_outs: outputs from PyTorch.
options: options for verification.
Raises:
AssertionError: if outputs from ONNX model and PyTorch model are not
equal up to specified precision.
ValueError: if arguments provided are invalid.
"""
if options.ignore_none:
# torch.jit._flatten filters None type
pt_outs, _ = torch.jit._flatten(pt_outs)
else:
pt_outs = _inline_flatten_list([pt_outs], [])
pt_outs_np = _unpack_to_numpy(pt_outs, cast_onnx_accepted=False)
onnx_outs = _inline_flatten_list(onnx_outs, [])
_compare_onnx_pytorch_outputs_in_np(onnx_outs, pt_outs_np, options)
@_beartype.beartype
def _prepare_input_for_pytorch(args, kwargs):
"""Prepare input for PyTorch model execution.
Any future changes/formatting to the input before dispatching to the PyTorch
model should be made in this function.
Args:
args: positional arguments for PyTorch model forward method.
kwargs: keyword arguments for PyTorch model forward method.
Returns:
args: positional arguments for PyTorch model forward method.
kwargs: keyword arguments for PyTorch model forward method.
"""
if isinstance(args, (torch.Tensor, dict)):
args = (args,)
# In-place operators will update input tensor data as well.
# Thus inputs are replicated before every forward call.
args = copy.deepcopy(args)
if kwargs:
kwargs = copy.deepcopy(kwargs)
else:
kwargs = {}
return args, kwargs
@_beartype.beartype
def _prepare_input_for_export(args, kwargs):
"""Prepare input for ONNX model export.
Any future changes/formatting to the input before dispatching to the
:func:`torch.onnx.export` api should be made in this function.
Args:
args: positional arguments for PyTorch model forward method.
kwargs: keyword arguments for PyTorch model forward method.
Returns:
onnx_inputs: positional arguments for ONNX model export, as `args` in
:func:`torch.onnx.export`.
"""
args, kwargs = _prepare_input_for_pytorch(args, kwargs)
if not kwargs and len(args) > 0 and isinstance(args[-1], dict):
onnx_inputs = args + ({},)
elif kwargs:
onnx_inputs = args + (kwargs,)
else:
onnx_inputs = args
return onnx_inputs
@_beartype.beartype
def _prepare_input_for_onnx(
args, kwargs, remained_onnx_input_idx: Optional[Sequence[int]], flatten: bool
):
"""Prepare input for ONNX model execution in ONNX backend.
Any future changes/formatting to the input before dispatching to the ONNX backend
run should be made in this function.
Args:
args: positional arguments for PyTorch model forward method.
kwargs: keyword arguments for PyTorch model forward method.
remained_onnx_input_idx: indices of inputs to be used for ONNX model execution.
flatten: whether to flatten the input before dispatching to the ONNX model execution.
Returns:
onnx_inputs: positional arguments for ONNX model execution in ONNX backend.
"""
onnx_inputs = _prepare_input_for_export(args, kwargs)
if flatten:
onnx_inputs, _ = torch.jit._flatten(onnx_inputs)
elif onnx_inputs and onnx_inputs[-1] == {}:
# Handle empty kwargs (normally removed by flatten).
onnx_inputs = onnx_inputs[:-1]
if remained_onnx_input_idx is not None:
return [onnx_inputs[i] for i in remained_onnx_input_idx]
else:
return onnx_inputs
@_beartype.beartype
def _try_clone_model(model):
"""Used for preserving original model in case forward mutates model states."""
try:
return copy.deepcopy(model)
except Exception:
warnings.warn(
"Failed to clone model. Model state might be mutated during verification."
)
return model
@_beartype.beartype
def _compare_onnx_pytorch_model(
pt_model: _ModelType,
onnx_model_f: Union[str, io.BytesIO],
input_args: _InputArgsType,
input_kwargs: Optional[_InputKwargsType],
additional_test_inputs: Optional[Sequence[_InputArgsType]],
options: VerificationOptions,
):
"""Compare outputs from ONNX model runs with outputs from PyTorch model runs.
Args:
pt_model: PyTorch model.
onnx_model_f: ONNX model file path or file-like object.
input_args: positional arguments for PyTorch model forward method.
input_kwargs: keyword arguments for PyTorch model forward method.
additional_test_inputs: additional positional arguments for PyTorch model
forward method.
options: options for verification.
Raises:
AssertionError: if outputs from ONNX model and PyTorch model are not
equal up to specified precision.
"""
onnx_session = _onnx_backend_session(onnx_model_f, options.backend)
@_beartype.beartype
def compare_onnx_pytorch_model_with_input(input_args, input_kwargs):
pt_args, pt_kwargs = _prepare_input_for_pytorch(input_args, input_kwargs)
# TODO: remove this and treat mutating model separately. See #77679
pt_model_copy = _try_clone_model(pt_model)
pt_outs = pt_model_copy(*pt_args, **pt_kwargs)
onnx_inputs = _prepare_input_for_onnx(
input_args, input_kwargs, options.remained_onnx_input_idx, options.flatten
)
onnx_outs = _run_onnx(onnx_session, onnx_inputs)
_compare_onnx_pytorch_outputs(
onnx_outs=onnx_outs,
pt_outs=pt_outs,
options=options,
)
compare_onnx_pytorch_model_with_input(input_args, input_kwargs)
if additional_test_inputs:
for test_input_args in additional_test_inputs:
compare_onnx_pytorch_model_with_input(test_input_args, {})
class _GraphDiff:
"""A class to represent the difference between two graphs."""
@_beartype.beartype
def __init__(self, graph_a: _C.Graph, graph_b: _C.Graph):
"""Construct a _GraphDiff object.
Args:
graph_a (_C.Graph): First graph to compare.
graph_b (_C.Graph): Second graph to compare.
"""
self.graph_a = graph_a
self.graph_b = graph_b
@_beartype.beartype
def __str__(self):
"""See function :func:`diff_report`."""
return self.diff_report()
@_beartype.beartype
def _indent(self, lines: str) -> str:
return "\n".join(["\t" + line for line in lines.splitlines()])
@_beartype.beartype
def diff_report(self) -> str:
"""Return a string representation of the graph difference.
The report shows the first pair of nodes that diverges. It also shows the source
location of the pair of nodes.
Returns:
graph_diff_report (str): A string representation of the graph difference.
"""
graph_a = self.graph_a
graph_b = self.graph_b
graph_a_str = str(graph_a)
graph_b_str = str(graph_b)
if graph_a_str == graph_b_str:
return ""
graph_diff = difflib.ndiff(
graph_a_str.splitlines(True), graph_b_str.splitlines(True)
)
graph_diff_report = ["Graph diff:", self._indent("".join(graph_diff))]
for node_a, node_b in itertools.zip_longest(graph_a.nodes(), graph_b.nodes()):
if str(node_a) != str(node_b):
graph_diff_report.append("First diverging operator:")
node_diff = difflib.ndiff(
str(node_a).splitlines(True), str(node_b).splitlines(True)
)
source_printout = ["node diff:", self._indent("".join(node_diff))]
stack_a = node_a.sourceRange() if node_a else None
if stack_a:
source_printout.extend(
["Former source location:", self._indent(str(stack_a))]
)
stack_b = node_b.sourceRange() if node_b else None
if stack_b:
source_printout.extend(
["Latter source location:", self._indent(str(stack_b))]
)
graph_diff_report.extend(source_printout)
break
return "\n".join(graph_diff_report)
@_beartype.beartype
def _check_graph_diff(
model: Union[torch.nn.Module, torch.jit.ScriptModule],
test_input_groups: Sequence[Tuple[Tuple[Any, ...], Mapping[str, Any]]],
export_options: _experimental.ExportOptions,
model_to_graph_func: Callable[
[
torch.nn.Module,
Tuple[Any, ...],
Mapping[str, Any],
_experimental.ExportOptions,
],
_C.Graph,
],
) -> str:
"""Check if graph produced by `model_to_graph_func` is the same across `test_input_groups`.
Args:
model: See :func:`check_export_model_diff`.
test_input_groups: See :func:`check_export_model_diff`.
export_options: See :func:`check_export_model_diff`.
model_to_graph_func: A function to convert a PyTorch model to a JIT IR graph.
Returns:
graph_diff_report (str): A string representation of the graph difference.
"""
if len(test_input_groups) < 2:
raise ValueError("Need at least two groups of test inputs to compare.")
ref_jit_graph = None
for args, kwargs in test_input_groups:
jit_graph = model_to_graph_func(model, args, kwargs, export_options)
if ref_jit_graph is None:
ref_jit_graph = jit_graph
continue
graph_diff_report = _GraphDiff(ref_jit_graph, jit_graph).diff_report()
if graph_diff_report:
return graph_diff_report
return ""
@_beartype.beartype
def _traced_graph_from_model(
model: Union[torch.nn.Module, torch.jit.ScriptModule],
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
export_options: _experimental.ExportOptions,
) -> _C.Graph:
"""As part of the ONNX export steps, create a traced JIT graph from a PyTorch model.
Args:
model: See :func:`check_export_model_diff`.
args: See :func:`check_export_model_diff`.
kwargs: See :func:`check_export_model_diff`.
export_options: See :func:`check_export_model_diff`.
Returns:
jit_graph (_C.Graph): A traced JIT graph.
"""
training = export_options.training
verbose = export_options.verbose
with utils.exporter_context(model, training, verbose):
export_inputs = _prepare_input_for_export(args, kwargs)
model = utils._pre_trace_quant_model(model, export_inputs)
jit_graph, _, _, _ = utils._create_jit_graph(model, export_inputs)
return jit_graph
@_beartype.beartype
def _onnx_graph_from_model(
model: Union[torch.nn.Module, torch.jit.ScriptModule],
args: Tuple[Any, ...],
kwargs: Mapping[str, Any],
export_options: _experimental.ExportOptions,
) -> _C.Graph:
"""As part of the ONNX export steps, export an ONNX JIT graph from a PyTorch model.
Args:
model: See :func:`check_export_model_diff`.
args: See :func:`check_export_model_diff`.
kwargs: See :func:`check_export_model_diff`.
export_options: See :func:`check_export_model_diff`.
Returns:
onnx_graph (_C.Graph): An ONNX JIT graph.
"""
# TODO: refactor utils.py to remove duplicated code of context setup. See #78834
opset_version = export_options.opset_version
operator_export_type = export_options.operator_export_type
export_modules_as_functions = export_options.export_modules_as_functions
training = export_options.training
verbose = export_options.verbose
dynamic_axes = export_options.dynamic_axes
input_names = export_options.input_names
output_names = export_options.output_names
if opset_version is None:
opset_version = _constants.ONNX_DEFAULT_OPSET
utils._setup_trace_module_map(model, export_modules_as_functions)
if not operator_export_type:
if _C_onnx._CAFFE2_ATEN_FALLBACK:
operator_export_type = _C_onnx.OperatorExportTypes.ONNX_ATEN_FALLBACK
else:
operator_export_type = _C_onnx.OperatorExportTypes.ONNX
GLOBALS.export_onnx_opset_version = opset_version
GLOBALS.operator_export_type = operator_export_type
with utils.exporter_context(model, training, verbose):
do_constant_folding = utils._decide_constant_folding(
export_options.do_constant_folding, operator_export_type, training
)
if dynamic_axes is None:
dynamic_axes = {}
utils._validate_dynamic_axes(dynamic_axes, model, input_names, output_names)
export_inputs = _prepare_input_for_export(args, kwargs)
export_inputs = utils._decide_input_format(model, export_inputs)
onnx_graph, _, _ = utils._model_to_graph(
model,
export_inputs,
verbose,
input_names,
output_names,
operator_export_type,
do_constant_folding,
training=training,
dynamic_axes=dynamic_axes,
)
return onnx_graph
@_beartype.beartype
def _onnx_graph_from_aten_graph(
graph: torch.Graph,
export_options: _experimental.ExportOptions,
params_dict: Optional[Dict[str, Any]] = None,
) -> Tuple[torch.Graph, Dict[str, Any]]:
if params_dict is None:
params_dict = {}
operator_export_type = export_options.operator_export_type
dynamic_axes = export_options.dynamic_axes or {}
input_names = export_options.input_names
training = export_options.training
do_constant_folding = export_options.do_constant_folding
opset_version = export_options.opset_version or _constants.ONNX_DEFAULT_OPSET
GLOBALS.export_onnx_opset_version = opset_version
GLOBALS.operator_export_type = operator_export_type
do_constant_folding = utils._decide_constant_folding(
do_constant_folding, operator_export_type, training
)
# TODO: Below is doing aten graph to onnx. It should be abstracted as a
# function in torch/onnx/utils.py.
graph = graph.copy()
graph = utils._optimize_graph(
graph,
operator_export_type,
params_dict=params_dict,
dynamic_axes=dynamic_axes,
input_names=input_names,
)
if training is None or training == _C_onnx.TrainingMode.EVAL:
params_dict = torch._C._jit_pass_onnx_eval_peephole(graph, params_dict)
if (
do_constant_folding
and opset_version >= _constants.ONNX_CONSTANT_FOLDING_MIN_OPSET
):
params_dict = _C._jit_pass_onnx_constant_fold(graph, params_dict, opset_version)
_C._jit_pass_dce_allow_deleting_nodes_with_side_effects(graph)
if GLOBALS.onnx_shape_inference:
_C._jit_pass_onnx_graph_shape_type_inference(graph, params_dict, opset_version)
params_dict = _C._jit_pass_onnx_eliminate_unused_items(graph, params_dict)
# For ONNX opset < 9, constants only have three data types: float16, float, double.
# In this pass transform constants of other data types to float/double + cast operator.
if opset_version < 9:
_C._jit_pass_onnx_cast_all_constant_to_floating(graph)
params_dict = _C._jit_pass_filter_non_tensor_arguments(params_dict)
_C._jit_decay_packed_param_input_types(graph)
_C._jit_pass_dce_allow_deleting_nodes_with_side_effects(graph)
if export_options.verbose:
print("ONNX graph: ", graph)
return graph, params_dict
@_beartype.beartype
def _onnx_proto_from_onnx_graph(
onnx_graph: torch.Graph,
export_options: _experimental.ExportOptions,
params_dict: Dict[str, Any],
) -> Tuple[bytes, Mapping[str, bytes]]:
opset_version = export_options.opset_version or _constants.ONNX_DEFAULT_OPSET
dynamic_axes = export_options.dynamic_axes or {}
operator_export_type = export_options.operator_export_type
val_keep_init_as_ip = utils._decide_keep_init_as_input(
export_options.keep_initializers_as_inputs,
operator_export_type,
opset_version,
)
val_add_node_names = utils._decide_add_node_names(True, operator_export_type)
custom_opsets = export_options.custom_opsets or {}
proto, export_map, _, _ = onnx_graph._export_onnx( # type: ignore[attr-defined]
params_dict,
opset_version,
dynamic_axes,
False,
operator_export_type,
not export_options.verbose,
val_keep_init_as_ip,
custom_opsets,
val_add_node_names,
"",
{},
)
return proto, export_map
@_beartype.beartype
def check_export_model_diff(
model: Union[torch.nn.Module, torch.jit.ScriptModule],
test_input_groups: Sequence[Tuple[Tuple[Any, ...], Mapping[str, Any]]],
export_options: Optional[_experimental.ExportOptions] = None,
) -> str:
"""Verify exported model discrepancy between different groups of inputs.
A graph is exported for each group of inputs. The exported graphs are then compared
to each other, and discrepancies of first pair of nodes are reported. This function
first checks the jit graph. If no discrepancies were found, it then checks the onnx
graph.
Unless otherwise specified, the jit/ONNX graph is expected to be the same, regardless
of the inputs used for exporting. A discrepancy implies the graph exported is
not accurate when run on other groups of inputs, which will typically results in
runtime errors or mismatching output.
Args:
model (torch.nn.Module or torch.jit.ScriptModule): The model to be exported.
test_input_groups (Sequence[Tuple[Tuple[Any, ...], Mapping[str, Any]]]): A sequence
of input groups to be used to export the model. Each input group is a pair of
(args, kwargs).
export_options (_experimental.ExportOptions, optional): An _experimental.ExportOptions
object that controls the export behavior.
Returns:
str: A string containing the diff of the exported models.
"""
export_options = (
_experimental.ExportOptions() if export_options is None else export_options
)
jit_diff_report = _check_graph_diff(
model, test_input_groups, export_options, _traced_graph_from_model
)
if jit_diff_report:
return jit_diff_report
return _check_graph_diff(
model, test_input_groups, export_options, _onnx_graph_from_model
)
@_beartype.beartype
def verify(
model: _ModelType,
input_args: _InputArgsType,
input_kwargs: Optional[_InputKwargsType] = None,
do_constant_folding: bool = True,
dynamic_axes: Optional[
Mapping[str, Union[Mapping[int, str], Mapping[str, Sequence[int]]]]
] = None,
input_names: Optional[Sequence[str]] = None,
output_names: Optional[Sequence[str]] = None,
training: _C_onnx.TrainingMode = _C_onnx.TrainingMode.EVAL,
opset_version: Optional[int] = None,
keep_initializers_as_inputs: bool = True,
verbose: bool = False,
fixed_batch_size: bool = False,
use_external_data: bool = False,
additional_test_inputs: Optional[Sequence[_InputArgsType]] = None,
options: Optional[VerificationOptions] = None,
):
"""Verify model export to ONNX against original PyTorch model.
Args:
model (torch.nn.Module or torch.jit.ScriptModule): See :func:`torch.onnx.export`.
input_args (tuple): See :func:`torch.onnx.export`.
input_kwargs (dict): See :func:`torch.onnx.export`.
do_constant_folding (bool, optional): See :func:`torch.onnx.export`.
dynamic_axes (dict, optional): See :func:`torch.onnx.export`.
input_names (list, optional): See :func:`torch.onnx.export`.
output_names (list, optional): See :func:`torch.onnx.export`.
training (torch.onnx.TrainingMode): See :func:`torch.onnx.export`.
opset_version (int, optional): See :func:`torch.onnx.export`.
keep_initializers_as_inputs (bool, optional): See :func:`torch.onnx.export`.
verbose (bool, optional): See :func:`torch.onnx.export`.
fixed_batch_size (bool, optional): Legacy argument, used only by rnn test cases.
use_external_data (bool, optional): Explicitly specify whether to export the
model with external data.
additional_test_inputs (list, optional): List of tuples. Each tuple is a group of
input arguments to test. Currently only *args are supported.
options (_VerificationOptions, optional): A _VerificationOptions object that
controls the verification behavior.
Raises:
AssertionError: if outputs from ONNX model and PyTorch model are not
equal up to specified precision.
ValueError: if arguments provided are invalid.
"""
if options is None:
options = VerificationOptions()
if training == torch.onnx.TrainingMode.TRAINING:
model.train()
elif training == torch.onnx.TrainingMode.EVAL:
model.eval()
with torch.no_grad(), contextlib.ExitStack() as stack:
model_f: Union[str, io.BytesIO] = io.BytesIO()
if use_external_data:
tmpdir_path = stack.enter_context(tempfile.TemporaryDirectory())
model_f = os.path.join(tmpdir_path, "model.onnx")
inputs_for_export = _prepare_input_for_export(input_args, input_kwargs)
# TODO(#77679): remove this and treat mutating model separately.
model_copy = _try_clone_model(model)
utils._export(
model,
inputs_for_export,
model_f,
opset_version=opset_version,
do_constant_folding=do_constant_folding,
keep_initializers_as_inputs=keep_initializers_as_inputs,
dynamic_axes=dynamic_axes,
input_names=input_names,
output_names=output_names,
fixed_batch_size=fixed_batch_size,
training=training,
verbose=verbose,
)
_compare_onnx_pytorch_model(
pt_model=model_copy,
onnx_model_f=model_f,
input_args=input_args,
input_kwargs=input_kwargs,
additional_test_inputs=additional_test_inputs,
options=options,
)
@_beartype.beartype
def verify_aten_graph(
graph: torch.Graph,
input_args: Tuple[Any, ...],
export_options: _experimental.ExportOptions,
params_dict: Optional[Dict[str, Any]] = None,
verification_options: Optional[VerificationOptions] = None,
) -> Tuple[Optional[AssertionError], torch.Graph, _OutputsType, _OutputsType]:
if verification_options is None:
verification_options = VerificationOptions()
if params_dict is None:
params_dict = {}
original_jit_graph = graph
graph = graph.copy()
# Execute aten graph and get reference torch jit outputs.
graph_inputs = list(graph.inputs())
jit_inputs = tuple([arg for arg in input_args if arg is not None])
weights = [params_dict[v.debugName()] for v in graph_inputs[len(jit_inputs) :]]
assert all(w is not None for w in weights)
# TODO: Only copy the argument if mutation is detected in Graph.
jit_inputs = copy.deepcopy(jit_inputs)
jit_input_and_parameters = jit_inputs + tuple(weights)
jit_outs = torch._C._jit_interpret_graph(graph, jit_input_and_parameters) # type: ignore[attr-defined]
if not isinstance(jit_outs, (list, tuple)):
jit_outs = [jit_outs]
# Convert aten graph to onnx graph.
graph, onnx_params_dict = _onnx_graph_from_aten_graph(
graph, export_options, params_dict
)
proto, export_map = _onnx_proto_from_onnx_graph(
graph, export_options, onnx_params_dict
)
model_f: Union[str, io.BytesIO] = io.BytesIO()
export_type = _exporter_states.ExportTypes.PROTOBUF_FILE
onnx_proto_utils._export_file(proto, model_f, export_type, export_map)
# NOTE: Verification is unstable. Try catch to emit information for debugging.
try:
# NOTE: Input might be dce'ed, so we need to remove those from the input args.
new_input_names = {v.debugName() for v in graph.inputs()}
new_input_args = []
for v, arg in zip(original_jit_graph.inputs(), input_args):
if v.debugName() in new_input_names:
new_input_args.append(arg)
input_args = tuple(new_input_args)
onnx_inputs = _prepare_input_for_onnx(
input_args,
{},
verification_options.remained_onnx_input_idx,
verification_options.flatten,
)
onnx_session = _onnx_backend_session(model_f, verification_options.backend)
onnx_outs = _run_onnx(onnx_session, onnx_inputs)
del onnx_session # To free device memory
try:
_compare_onnx_pytorch_outputs(
onnx_outs=onnx_outs,
pt_outs=jit_outs,
options=verification_options,
)
except AssertionError as e:
return e, graph, jit_outs, onnx_outs
return None, graph, jit_outs, onnx_outs
except Exception as e:
print("Unexpected error during verification.")
print("jit graph: ", original_jit_graph)
print("onnx graph: ", graph)
raise e
class GraphInfoPrettyPrinter:
graph_info: Optional[GraphInfo]
upper_printer: Optional[GraphInfoPrettyPrinter]
lower_printer: Optional[GraphInfoPrettyPrinter]
graph_str_lambdas: Mapping[int, str]
connector_str_lambdas: Mapping[int, str]
children_str_lambdas: Mapping[int, str]
def __init__(self, graph_info: Optional[GraphInfo]):
self.graph_info = graph_info
if (
graph_info is not None
and graph_info.upper_graph_info is not None
and graph_info.lower_graph_info is not None
):
self.upper_printer = GraphInfoPrettyPrinter(graph_info.upper_graph_info)
self.lower_printer = GraphInfoPrettyPrinter(graph_info.lower_graph_info)
else:
self.upper_printer = None
self.lower_printer = None
@_beartype.beartype
def _total_rows(self) -> int:
if self.graph_info is None:
return 1
if self.upper_printer and self.lower_printer:
return (
self.upper_printer._total_rows() + self.lower_printer._total_rows() + 1
)
return 2 # Two lines: node count + id.
@_beartype.beartype
def _node_count_segment_str(self) -> str:
if self.graph_info is None:
return "..."
node_count = self.graph_info.essential_node_count()
has_mismatch = self.graph_info.has_mismatch()
error_node_kind = (
f"({self.graph_info.essential_node_kinds().pop()})"
if node_count == 1 and has_mismatch
else ""
)
return f"{node_count} {'X' if has_mismatch else ''} {error_node_kind}"
@_beartype.beartype
def _graph_id_segment_str(self) -> str:
if self.graph_info is None:
return ""
return f"id: {self.graph_info.id}"
@_beartype.beartype
def _max_segment_columns(self) -> int:
return max(
map(len, (self._node_count_segment_str(), self._graph_id_segment_str()))
)
@_beartype.beartype
def _graph_segment_str_at_line(self, line: int) -> str:
"""Get the string representation of the graph segment at the given line."""
if line == 0:
result_str = self._node_count_segment_str()
result_str += " " * (self._max_segment_columns() - len(result_str))
return result_str
if line == 1:
result_str = self._graph_id_segment_str()
result_str += " " * (self._max_segment_columns() - len(result_str))
return result_str
if 0 <= line < self._total_rows():
return " " * self._max_segment_columns()
return ""
@_beartype.beartype
def _connector_segment_str_at_line(self, line: int) -> str:
"""Get the connector segment string at the given line."""
if self.upper_printer is None and self.lower_printer is None:
return ""
upper_total_rows = self.upper_printer._total_rows() if self.upper_printer else 1
lower_total_rows = self.lower_printer._total_rows() if self.lower_printer else 1
if line == 0:
return " __"
elif line < upper_total_rows + 1:
return " | "
elif line == upper_total_rows + 1:
return " |__"
elif line < upper_total_rows + lower_total_rows + 1:
return " "
return ""
@_beartype.beartype
def _children_str_at_line(self, line: int) -> str:
"""Get the string representation of the children at the given line.
Recursively calls `_str_at_line` on children nodes.
"""
if self.upper_printer is None and self.lower_printer is None:
return ""
upper_total_rows = self.upper_printer._total_rows() if self.upper_printer else 1
lower_total_rows = self.lower_printer._total_rows() if self.lower_printer else 1
if 0 <= line < upper_total_rows:
return (
self.upper_printer._str_at_line(line) if self.upper_printer else "..."
)
elif upper_total_rows < line < upper_total_rows + lower_total_rows + 1:
return (
self.lower_printer._str_at_line(line - upper_total_rows - 1)
if self.lower_printer
else "..."
)
return ""
@_beartype.beartype
def _str_at_line(self, line: int) -> str:
"""Get the string representation of the graph at the given line."""
return (
self._graph_segment_str_at_line(line)
+ self._connector_segment_str_at_line(line)
+ self._children_str_at_line(line)
)
def pretty_print(self):
if self.graph_info is None:
print(None)
return
# Print tree.
print(" Tree: ".center(80, "="))
total_rows = self._total_rows()
for line in range(total_rows):
print(self._str_at_line(line).rstrip())
if self.graph_info.has_mismatch():
# Summarize leaf subgraphs with mismatch.
print(" Mismatch leaf subgraphs: ".center(80, "="))
print(
[
graph_info.id
for graph_info in self.graph_info.all_mismatch_leaf_graph_info()
]
)
# Summarize node kinds with mismatch.
mismatch_node_kinds: Dict[str, int] = {}
for graph_info in self.graph_info.all_mismatch_leaf_graph_info():
node_kinds = graph_info.essential_node_kinds()
if len(node_kinds) == 1:
node_kind = node_kinds.pop()
mismatch_node_kinds[node_kind] = (
mismatch_node_kinds.get(node_kind, 0) + 1
)
print(" Mismatch node kinds: ".center(80, "="))
print(mismatch_node_kinds)
else:
print(" No mismatch found. ".center(80, "="))
class OnnxTestCaseRepro:
def __init__(self, repro_dir):
self.repro_dir = repro_dir
self.proto, self.inputs, self.outputs = onnx_proto_utils.load_test_case(
repro_dir
)
@classmethod
@_beartype.beartype
def create_test_case_repro(
cls, proto: bytes, inputs, outputs, dir: str, name: Optional[str] = None
):
"""Create a repro under "{dir}/test_{name}" for an ONNX test case.
The test case contains the model and the inputs/outputs data. The directory
structure is as follows:
dir
test_<name>
model.onnx
test_data_set_0
input_0.pb
input_1.pb
output_0.pb
output_1.pb
Args:
proto: ONNX model proto.
inputs: Inputs to the model.
outputs: Outputs of the model.
dir: Directory to save the repro.
name: Name of the test case. If not specified, a name based on current time
will be generated.
Returns:
Path to the repro.
"""
if name is None:
name = datetime.datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")
return onnx_proto_utils.export_as_test_case(
proto,
_to_numpy(inputs),
_to_numpy(outputs),
name,
dir,
)
@_beartype.beartype
def validate(self, options: VerificationOptions):
"""Run the ONNX test case with options.backend, and compare with the expected outputs.
Args:
options: Options for validation.
Raise:
AssertionError: if outputs from options.backend and expected outputs are not
equal up to specified precision.
"""
onnx_session = _onnx_backend_session(io.BytesIO(self.proto), options.backend)
run_outputs = onnx_session.run(None, self.inputs)
if hasattr(onnx_session, "get_outputs"):
output_names = [o.name for o in onnx_session.get_outputs()]
elif hasattr(onnx_session, "output_names"):
output_names = onnx_session.output_names
else:
raise ValueError(f"Unknown onnx session type: {type(onnx_session)}")
expected_outs = [self.outputs[name] for name in output_names]
_compare_onnx_pytorch_outputs_in_np(run_outputs, expected_outs, options)
@dataclasses.dataclass
class GraphInfo:
"""GraphInfo contains validation information of a TorchScript graph and its converted ONNX graph."""
graph: torch.Graph
input_args: Tuple[Any, ...]
params_dict: Dict[str, Any]
export_options: _experimental.ExportOptions = dataclasses.field(
default_factory=_experimental.ExportOptions
)
mismatch_error: Optional[AssertionError] = dataclasses.field(
default=None, init=False
)
pt_outs: Optional[Sequence[_NumericType]] = dataclasses.field(
default=None, init=False
)
upper_graph_info: Optional[GraphInfo] = dataclasses.field(default=None, init=False)
lower_graph_info: Optional[GraphInfo] = dataclasses.field(default=None, init=False)
id: str = dataclasses.field(default="")
_onnx_graph: Optional[torch.Graph] = dataclasses.field(init=False, default=None)
_EXCLUDED_NODE_KINDS: FrozenSet[str] = frozenset(
{"prim::Constant", "prim::ListConstruct", "aten::ScalarImplicit"}
)
def clear(self):
"""Clear states and results of previous verification."""
self.mismatch_error = None
self.pt_outs = None
self._onnx_graph = None
self.upper_graph_info = None
self.lower_graph_info = None
def pretty_print_tree(self):
"""Pretty print `GraphInfo` tree.
Each node represents a subgraph, showing the number of nodes in the subgraph and
a check mark if the subgraph has output mismatch between torch and ONNX.
The id of the subgraph is shown under the node. The `GraphInfo` object for any
subgraph can be retrieved by calling `graph_info.find_partition(id)`.
Example::
==================================== Tree: =====================================
5 X __2 X __1
id: | id: 0 | id: 00
| |
| |__1 X (aten::relu)
| id: 01
|
|__3 X __1
id: 1 | id: 10
|
|__2 X __1 X (aten::relu)
id: 11 | id: 110
|
|__1
id: 111
=========================== Mismatch leaf subgraphs: ===========================
['01', '110']
============================= Mismatch node kinds: =============================
{'aten::relu': 2}
"""
GraphInfoPrettyPrinter(self).pretty_print()
def pretty_print_mismatch(self, graph: bool = False):
"""Pretty print details of the mismatch between torch and ONNX.
Args:
graph: If True, print the ATen JIT graph and ONNX graph.
"""
print(f" Mismatch info for graph partition {self.id}: ".center(80, "="))
if graph:
print(" ATen JIT graph ".center(80, "="))
# TODO: A more compact graph printer.
# * Drop stride, grad, device information.
# * Show source location on a separate line.
print(self.graph)
if self._onnx_graph is not None:
print(" ONNX graph ".center(80, "="))
print(self._onnx_graph)
if self.has_mismatch():
print(" Mismatch error ".center(80, "="))
print(self.mismatch_error)
else:
print(" No mismatch ".center(80, "="))
@_beartype.beartype
def has_mismatch(self) -> bool:
"""Return True if the subgraph has output mismatch between torch and ONNX."""
return self.mismatch_error is not None
@_beartype.beartype
def essential_node_count(self) -> int:
"""Return the number of nodes in the subgraph excluding those in `_EXCLUDED_NODE_KINDS`."""
return sum(
1 for n in self.graph.nodes() if n.kind() not in self._EXCLUDED_NODE_KINDS
)
@_beartype.beartype
def essential_node_kinds(self) -> Set[str]:
"""Return the set of node kinds in the subgraph excluding those in `_EXCLUDED_NODE_KINDS`."""
return {
n.kind()
for n in self.graph.nodes()
if n.kind() not in self._EXCLUDED_NODE_KINDS
}
@_beartype.beartype
def all_mismatch_leaf_graph_info(self) -> List["GraphInfo"]:
"""Return a list of all leaf `GraphInfo` objects that have mismatch."""
if not self.has_mismatch():
return []
no_mismatch_children = (
self.upper_graph_info is None or not self.upper_graph_info.has_mismatch()
) and (
self.lower_graph_info is None or not self.lower_graph_info.has_mismatch()
)
if no_mismatch_children:
return [self]
results = []
if self.upper_graph_info is not None:
results += self.upper_graph_info.all_mismatch_leaf_graph_info()
if self.lower_graph_info is not None:
results += self.lower_graph_info.all_mismatch_leaf_graph_info()
return results
@_beartype.beartype
def find_partition(self, id: str) -> Optional["GraphInfo"]:
"""Find the `GraphInfo` object with the given id."""
if id == self.id:
return self
current_length = len(self.id)
if len(id) > current_length:
if id[current_length] == "0" and self.upper_graph_info is not None:
return self.upper_graph_info.find_partition(id)
elif id[current_length] == "1" and self.lower_graph_info is not None:
return self.lower_graph_info.find_partition(id)
return None
@_beartype.beartype
def export_repro(
self, repro_dir: Optional[str] = None, name: Optional[str] = None
) -> str:
"""Export the subgraph to ONNX along with the input/output data for repro.
The repro directory will contain the following files::
dir
test_<name>
model.onnx
test_data_set_0
input_0.pb
input_1.pb
output_0.pb
output_1.pb
Args:
repro_dir: The directory to export the repro files to. Defaults to current
working directory if None.
name: An optional name for the test case folder: "test_{name}".
Returns:
The path to the exported repro directory.
"""
if repro_dir is None:
repro_dir = os.getcwd()
repro_dir = os.path.join(repro_dir, "onnx_debug")
onnx_graph, onnx_params_dict = _onnx_graph_from_aten_graph(
self.graph, self.export_options, self.params_dict
)
proto, _ = _onnx_proto_from_onnx_graph(
onnx_graph, self.export_options, onnx_params_dict
)
return OnnxTestCaseRepro.create_test_case_repro(
proto, self.input_args, self.pt_outs, repro_dir, name
)
@_beartype.beartype
def _graph_partition_pivot(self) -> int:
"""Find the pivot index to partition the graph.
The pivot is the node that splits the graph into two parts. Each part should
have the similar amount of nodes, excluding non essential ops, defined in
`_EXCLUDED_NODE_KINDS`, such as `prim::Constant`.
If the graph has an odd number of nodes, the upper part will have one more node.
If the graph does not have any node that can be partitioned, return -1.
Returns:
The index of the pivot node.
"""
included_node_indices = [
i
for i, n in enumerate(self.graph.nodes())
if n.kind() not in self._EXCLUDED_NODE_KINDS
]
half_idx = len(included_node_indices) // 2 - 1
if half_idx >= 0 and len(included_node_indices) > half_idx:
return included_node_indices[half_idx] + 1
return -1
@_beartype.beartype
def _partition_upper_graph(self) -> torch.Graph:
pivot = self._graph_partition_pivot()
if pivot == -1:
return torch.Graph()
graph = self.graph.copy() # Copy to not mutate parent graph.
original_outputs = list(graph.outputs())
def _process_bridge_value_for_upper(
new_outputs: List[torch.Value], bridge_value: torch.Value
) -> torch.Value:
# Add bridge values as upper graph outputs.
new_outputs.append(bridge_value)
return bridge_value
new_outputs: List[torch.Value] = []
process_bridge_value_for_upper = functools.partial(
_process_bridge_value_for_upper, new_outputs
)
_, dropped_nodes, complete_upper_nodes_set, _ = self._partition_nodes(
graph, pivot, process_bridge_value_for_upper
)
for _ in enumerate(original_outputs):
graph.eraseOutput(0)
for output in new_outputs:
graph.registerOutput(output)
for node in reversed(dropped_nodes):
node.destroy()
for i, input in reversed(list(enumerate(list(graph.inputs())))):
if (
not _has_uses_by_nodes(input, complete_upper_nodes_set)
and input not in new_outputs
):
try:
graph.eraseInput(i)
except RuntimeError as e:
print(input, graph)
raise e
return graph
@_beartype.beartype
def _partition_lower_graph(self) -> torch.Graph:
pivot = self._graph_partition_pivot()
if pivot == -1:
return torch.Graph()
graph = self.graph.copy() # Copy to not mutate parent graph.
original_outputs = list(graph.outputs())
original_inputs = list(graph.inputs())
new_outputs = []
def _process_bridge_value_for_lower(
graph: torch.Graph, bridge_value: torch.Value
) -> torch.Value:
# Add bridge values as lower graph inputs.
new_input = graph.addInput()
bridge_value.replaceAllUsesWith(new_input)
new_input.copyMetadata(bridge_value)
return new_input
process_bridge_value_for_lower = functools.partial(
_process_bridge_value_for_lower, graph
)
upper_nodes, lower_nodes, _, complete_lower_nodes_set = self._partition_nodes(
graph, pivot, process_bridge_value_for_lower
)
for output in original_outputs:
if _produced_by(output, lower_nodes):
new_outputs.append(output)
for _ in enumerate(original_outputs):
graph.eraseOutput(0)
for output in new_outputs:
graph.registerOutput(output)
for input in original_inputs:
if _has_uses_by_nodes(input, complete_lower_nodes_set):
new_input = graph.addInput()
input.replaceAllUsesWith(new_input)
new_input.copyMetadata(input)
for node in reversed(upper_nodes):
if node not in complete_lower_nodes_set:
try:
node.destroy()
except RuntimeError as e:
print(node, graph)
raise e
for _ in original_inputs:
graph.eraseInput(0)
return graph
@_beartype.beartype
def _partition_node(
self,
node: torch.Node,
complete_upper_nodes_set: Set[torch.Node],
complete_lower_nodes_set: Set[torch.Node],
original_graph_outputs: Set[torch.Value],
covered_bridge_values: Set[torch.Value],
process_bridge_value: Callable[[torch.Value], torch.Value],
):
if node in complete_lower_nodes_set:
return
if (
_node_has_uses_by(node, complete_lower_nodes_set)
and node.kind() in self._EXCLUDED_NODE_KINDS
):
complete_lower_nodes_set.update(_all_nodes([node]))
for input in node.inputs():
if input in covered_bridge_values:
continue
self._partition_node(
input.node(),
complete_upper_nodes_set,
complete_lower_nodes_set,
original_graph_outputs,
covered_bridge_values,
process_bridge_value,
)
else:
for output in node.outputs():
if output in covered_bridge_values:
continue
if (
_has_uses_by_nodes(output, complete_lower_nodes_set)
or output in original_graph_outputs
):
covered_bridge_values.add(process_bridge_value(output))
@_beartype.beartype
def _partition_nodes(
self,
graph: torch.Graph,
pivot: int,
process_bridge_value: Callable[[torch.Value], torch.Value],
) -> Tuple[List[torch.Node], List[torch.Node], Set[torch.Node], Set[torch.Node]]:
nodes = list(graph.nodes())
upper_nodes = nodes[:pivot]
lower_nodes = nodes[pivot:]
# `upper_nodes` and `complete_upper_nodes_set` differs in that the latter
# recursively contains nodes in subblock of `upper_nodes`.
# The same applies for `lower_nodes` and `complete_lower_nodes_set`.
# With addition that `complete_lower_nodes_set` will include nodes that
# are determined to be copied from `upper_nodes` to `lower_nodes`.
complete_upper_nodes_set = _all_nodes(upper_nodes)
complete_lower_nodes_set = _all_nodes(lower_nodes)
original_graph_outputs = set(graph.outputs())
# Bridge values are values produced from upper graph, and consumed
# by lower graph. These values need to be become upper graph outputs
# and lower graph inputs, to bridge the interaction.
# Start with all graph inputs marked as covered. If any graph input is
# needed by lower graph, just keep it in lower graph inputs later.
covered_bridge_values = set(graph.inputs())
for node in upper_nodes:
self._partition_node(
node,
complete_upper_nodes_set,
complete_lower_nodes_set,
original_graph_outputs,
covered_bridge_values,
process_bridge_value,
)
return (
upper_nodes,
lower_nodes,
complete_upper_nodes_set,
complete_lower_nodes_set,
)
@_beartype.beartype
def _bridge_kwargs(self):
pt_outs = self.pt_outs
graph_outputs = list(self.graph.outputs())
assert pt_outs is not None
assert len(graph_outputs) == len(
pt_outs
), f"{len(graph_outputs)} vs {len(pt_outs)}\nGraph: {self.graph}"
return {v.debugName(): o for v, o in zip(graph_outputs, pt_outs)}
@_beartype.beartype
def _args_and_params_for_partition_graph(
self,
graph: torch.Graph,
bridge_kwargs: Mapping[str, Union[_NumericType, Sequence[_NumericType]]],
full_kwargs: Mapping[str, torch.Tensor],
full_params: Mapping[str, torch.Tensor],
):
input_names = [input.debugName() for input in graph.inputs()]
args = tuple(bridge_kwargs[k] for k in input_names if k in bridge_kwargs)
args += tuple(full_kwargs[k] for k in input_names if k in full_kwargs)
params = {k: full_params[k] for k in input_names if k in full_params}
assert len(args) + len(params) == len(
input_names
), f"{len(args)} + {len(params)} vs {len(input_names)}: {input_names}"
return args, params
@_beartype.beartype
def verify_export(
self, options: VerificationOptions
) -> Tuple[Optional[AssertionError], torch.Graph, _OutputsType, _OutputsType]:
"""
Verify the export from TorchScript IR graph to ONNX.
Export the TorchScript IR graph to ONNX, with the inputs, parameters and export
options recorded in this object. Then verify the exported ONNX graph against
the original TorchScript IR graph under the provided verification options.
Args:
options: The verification options.
Returns:
error: The AssertionError raised during the verification. Returns None if no
error is raised.
onnx_graph: The exported ONNX graph in TorchScript IR format.
onnx_outs: The outputs from running exported ONNX model under the onnx
backend in `options`.
pt_outs: The outputs from running the TorchScript IR graph.
"""
return verify_aten_graph(
self.graph,
input_args=self.input_args,
params_dict=self.params_dict,
export_options=self.export_options,
verification_options=options,
)
@_beartype.beartype
def find_mismatch(
self,
options: Optional[VerificationOptions] = None,
):
"""
Find all mismatches between the TorchScript IR graph and the exported onnx model.
Binary searches the model graph to find the minimal subgraph that exhibits the
mismatch. A `GraphInfo` object is created for each subgraph, recording the test
inputs and export options, as well as the validation results.
Args:
options: The verification options.
"""
self.clear()
if options is None:
options = VerificationOptions()
if self.export_options.verbose:
print(self.graph)
if len(list(self.graph.outputs())) == 0:
return
assert len(self.input_args) + len(self.params_dict) == len(
list(self.graph.inputs())
), (
f"Number of graph inputs({len(list(self.graph.inputs()))}) does not match "
f"the provided tensor arguments({len(self.input_args)} + {len(self.params_dict)})."
)
self.mismatch_error, self._onnx_graph, self.pt_outs, _ = self.verify_export(
options
)
if self.mismatch_error is None:
# No mismatch found in graph.
return
if self.essential_node_count() <= 1:
# Reached leaf node, no more partitioning.
return
full_kwargs = {
k.debugName(): v for k, v in zip(self.graph.inputs(), self.input_args)
}
full_params = self.params_dict
upper_graph = self._partition_upper_graph()
upper_args, upper_params = self._args_and_params_for_partition_graph(
upper_graph, {}, full_kwargs, full_params
)
self.upper_graph_info = GraphInfo(
upper_graph,
upper_args,
upper_params,
self.export_options,
id=self.id + "0",
)
self.upper_graph_info.find_mismatch(options)
bridge_kwargs = self.upper_graph_info._bridge_kwargs()
lower_graph = self._partition_lower_graph()
lower_args, lower_params = self._args_and_params_for_partition_graph(
lower_graph, bridge_kwargs, full_kwargs, full_params
)
self.lower_graph_info = GraphInfo(
lower_graph,
lower_args,
lower_params,
self.export_options,
id=self.id + "1",
)
self.lower_graph_info.find_mismatch(options)
@_beartype.beartype
def _all_nodes(nodes: Collection[torch.Node]) -> Set[torch.Node]:
all_nodes = set(nodes)
for n in nodes:
for b in n.blocks():
all_nodes.update(_all_nodes(list(b.nodes())))
return all_nodes
@_beartype.beartype
def _has_uses_by_nodes(value: torch.Value, nodes: Collection[torch.Node]) -> bool:
if any(use.user in nodes for use in value.uses()):
return True
return False
@_beartype.beartype
def _node_has_uses_by(node: torch.Node, nodes: Collection[torch.Node]) -> bool:
for output in node.outputs():
if _has_uses_by_nodes(output, nodes):
return True
return False
@_beartype.beartype
def _produced_by(value: torch.Value, nodes: Collection[torch.Node]) -> bool:
return value.node() in nodes
@_beartype.beartype
def find_mismatch(
model: Union[torch.nn.Module, torch.jit.ScriptModule],
input_args: Tuple[Any, ...],
do_constant_folding: bool = True,
training: _C_onnx.TrainingMode = _C_onnx.TrainingMode.EVAL,
opset_version: Optional[int] = None,
keep_initializers_as_inputs: bool = True,
verbose: bool = False,
options: Optional[VerificationOptions] = None,
) -> GraphInfo:
r"""Find all mismatches between the original model and the exported model.
Experimental. The API is subject to change.
This tool helps debug the mismatch between the original PyTorch model and exported
ONNX model. It binary searches the model graph to find the minimal subgraph that
exhibits the mismatch.
Args:
model: The model to be exported.
input_args: The input arguments to the model.
do_constant_folding: Same as `do_constant_folding` in :func:`torch.onnx.export`.
training: Same as `training` in :func:`torch.onnx.export`.
opset_version: Same as `opset_version` in :func:`torch.onnx.export`.
keep_initializers_as_inputs: Same as `keep_initializers_as_inputs` in :func:`torch.onnx.export`.
verbose: Same as `verbose` in :func:`torch.onnx.export`.
options: The options for the mismatch verification.
Returns:
A GraphInfo object that contains the mismatch information.
Example::
>>> import torch
>>> import torch.onnx.verification
>>> torch.manual_seed(0)
>>> opset_version = 15
>>> # Define a custom symbolic function for aten::relu.
>>> # The custom symbolic function is incorrect, which will result in mismatches.
>>> def incorrect_relu_symbolic_function(g, self):
... return self
>>> torch.onnx.register_custom_op_symbolic(
... "aten::relu",
... incorrect_relu_symbolic_function,
... opset_version=opset_version,
... )
>>> class Model(torch.nn.Module):
... def __init__(self):
... super().__init__()
... self.layers = torch.nn.Sequential(
... torch.nn.Linear(3, 4),
... torch.nn.ReLU(),
... torch.nn.Linear(4, 5),
... torch.nn.ReLU(),
... torch.nn.Linear(5, 6),
... )
... def forward(self, x):
... return self.layers(x)
>>> # xdoctest: +REQUIRES(env:TORCH_DOCTEST_ONNX)
>>> graph_info = torch.onnx.verification.find_mismatch(
... Model(),
... (torch.randn(2, 3),),
... opset_version=opset_version,
... )
===================== Mismatch info for graph partition : ======================
================================ Mismatch error ================================
Tensor-likes are not close!
Mismatched elements: 12 / 12 (100.0%)
Greatest absolute difference: 0.2328854203224182 at index (1, 2) (up to 1e-07 allowed)
Greatest relative difference: 0.699536174352349 at index (1, 3) (up to 0.001 allowed)
==================================== Tree: =====================================
5 X __2 X __1
id: | id: 0 | id: 00
| |
| |__1 X (aten::relu)
| id: 01
|
|__3 X __1
id: 1 | id: 10
|
|__2 X __1 X (aten::relu)
id: 11 | id: 110
|
|__1
id: 111
=========================== Mismatch leaf subgraphs: ===========================
['01', '110']
============================= Mismatch node kinds: =============================
{'aten::relu': 2}
"""
if options is None:
options = VerificationOptions()
if opset_version is None:
opset_version = _constants.ONNX_DEFAULT_OPSET
"""From aten graph, do binary search on graph partition to find operator export discrepancy."""
# TODO: Copied from utils.py `export` until `_optimize_graph`.
if training == torch.onnx.TrainingMode.TRAINING:
model.train()
elif training == torch.onnx.TrainingMode.EVAL:
model.eval()
with torch.no_grad():
inputs_for_export = _prepare_input_for_export(input_args, {})
args = utils._decide_input_format(model, inputs_for_export)
model = utils._pre_trace_quant_model(model, args)
graph, params, torch_out, module = utils._create_jit_graph(model, args)
params_dict = utils._get_named_param_dict(graph, params)
utils._apply_friendly_debug_names(graph, params_dict)
graph_info = GraphInfo(
graph,
input_args,
params_dict,
_experimental.ExportOptions(
do_constant_folding=do_constant_folding,
training=training,
opset_version=opset_version,
keep_initializers_as_inputs=keep_initializers_as_inputs,
verbose=verbose,
),
)
graph_info.find_mismatch(options)
graph_info.pretty_print_mismatch()
graph_info.pretty_print_tree()
return graph_info