Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion backends/nxp/backend/edge_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,10 @@ def output_quantization_type(
│ <returned type>
"""
users = list(node.users)
if len(users) == 1:
if output_index is None:
# Basic QDQ case (without getitem nodes).
if not _is_quantize(quantize_node := users[0]):
# Broken QDQ schema.
return None

else: # Multiple users
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import operator

import numpy as np
import torch

from executorch.backends.nxp.backend.edge_helper import try_get_arg
from executorch.backends.nxp.backend.ir.converter.conversion import (
Expand Down Expand Up @@ -73,32 +74,54 @@ def _is_supported_on_target(
MaxPool2DWithIndicesConverter._get_node_args(node)
)

output_shape = node.meta["val"][0].shape # Shape of the main output (index 0)
if output_shape[0] != 1:
# /neutron-converter/src/OperatorC/MaxPoolPlugin.cpp?at=NEUTRON_SOFTWARE_2.2.2#106
return False

# Neutron only has a restriction on `stride_h`. `stride_w` is not restricted.
stride_h = stride[0]
if stride_h not in (1, 2):
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#901
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#923
return False

channels = output_shape[1]
if channels % neutron_target_spec.get_num_macs() != 0:
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#903
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#925
return False

if any(pad > kernel_dim for pad, kernel_dim in zip(padding, kernel_size)):
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#904-907
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#926-929

# Cannot be tested as PyTorch crashes in this case. It requires the padding to be at most half of the
# effective kernel size, which is an even stricter requirement than what Neutron imposes.
# https://github.com/pytorch/pytorch/blob/449b1768410104d3ed79d3bcfe4ba1d65c7f22c0/torch/_meta_registrations.py#L4483-L4489
return False
if custom_delegation_options.use_new_flow_neutron_c:
# Requirements specified by the new Neutron flow documentation.

supported_types = [torch.int8, torch.uint8]
if not NodeConverter.uses_quantization_type_for_io(
node, supported_types, [0], [0]
):
return False

maximum_supported_kernel_size = 4096
# If there is no padding, Neutron allows maximum stride of 4096. Otherwise, it's 32. But the converter
# always inserts a `Pad` operator to add the padding, so the `MaxPool` never pads it's input itself, so
# 4096 is always the limit. And similarly, the `MaxPool` input padding limitation does not apply either.
maximum_supported_stride = 4096

if any(k > maximum_supported_kernel_size for k in kernel_size):
return False
if any(s > maximum_supported_stride for s in stride):
return False

else:
# Shape of the main output (index 0)
output_shape = node.meta["val"][0].shape
if output_shape[0] != 1:
# /neutron-converter/src/OperatorC/MaxPoolPlugin.cpp?at=NEUTRON_SOFTWARE_2.2.2#106
return False

# Neutron only has a restriction on `stride_h`. `stride_w` is not restricted.
stride_h = stride[0]
if stride_h not in (1, 2):
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#901
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#923
return False

channels = output_shape[1]
if channels % neutron_target_spec.get_num_macs() != 0:
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#903
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#925
return False

if any(pad > kernel_dim for pad, kernel_dim in zip(padding, kernel_size)):
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#904-907
# /neutron-library/src/utils/NeutronLibraryInterrogation.cpp?at=refs%2Ftags%2FNEUTRON_SOFTWARE_2.2.2#926-929

# Cannot be tested as PyTorch crashes in this case. It requires the padding to be at most half of the
# effective kernel size, which is an even stricter requirement than what Neutron imposes.
# https://github.com/pytorch/pytorch/blob/449b1768410104d3ed79d3bcfe4ba1d65c7f22c0/torch/_meta_registrations.py#L4483-L4489
return False

return True

Expand Down
188 changes: 183 additions & 5 deletions backends/nxp/tests/graph_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,85 @@

import abc
import re
from collections import defaultdict
from copy import deepcopy
from dataclasses import dataclass
from typing import Union
from typing import Callable, Union

from executorch.backends.nxp.neutron_partitioner import (
NeutronPartitioner,
NXP_DELEGATION_TAG,
)
from executorch.backends.nxp.tests.ops_aliases import (
DequantizePerChannel,
DequantizePerTensor,
QuantizePerChannel,
QuantizePerTensor,
)

from executorch.exir.dialects.edge._ops import EdgeOpOverload

from pytest_mock import MockerFixture

from torch.fx import Node
from torch.fx.graph import Graph


@dataclass
class NonDelegatedNode:
"""Represents an expected non-delegated node in the graph.

:param node_name: The name of the node to check for
:param num_occurrences: Expected number of occurrences. If None, just verifies that at least one exists
"""

node_name: str
num_occurrences: Union[int, None] = None


class GraphVerifier(abc.ABC):
"""Abstract base class for graph verification strategies."""

@abc.abstractmethod
def verify_graph(self, graph: Graph):
pass
"""Verifies the graph meets expected criteria.

@abc.abstractmethod
def check_num_delegated_nodes(self, num_dlg_nodes: int):
:param graph: The FX graph to verify
:raises AssertionError: If the graph does not meet expectations
"""
pass


class BaseGraphVerifier(GraphVerifier):
"""Graph verifier base class. Checks for number of delegated nodes and number of selected expected nodes."""
"""Graph verifier base class. Checks for number of delegated nodes and number of selected expected nodes.

This verifier performs the following checks:
- The total number of delegated call nodes matches expectations
- Specific non-delegated nodes appear with the expected frequency
- No unexpected aten nodes are present in the graph
"""

def __init__(
self,
exp_num_delegate_call_nodes: int,
exp_non_delegated_nodes: list[NonDelegatedNode] = None,
):
"""Initializes the BaseGraphVerifier.

:param exp_num_delegate_call_nodes: Expected number of delegated nodes
:param exp_non_delegated_nodes: List of expected non-delegated nodes to verify
"""
self.exp_non_delegated_nodes = (
exp_non_delegated_nodes if exp_non_delegated_nodes is not None else []
)
self.exp_num_delegate_call_nodes = exp_num_delegate_call_nodes

def check_num_delegated_nodes(self, num_dlg_nodes):
"""Checks that the number of delegated nodes matches expectations.

:param num_dlg_nodes: Actual number of delegated nodes
Comment thread
MartinPavella marked this conversation as resolved.
:raises AssertionError: If the count doesn't match expectations
"""
assert not (
num_dlg_nodes < self.exp_num_delegate_call_nodes
), f"Number of delegated nodes decreased from {self.exp_num_delegate_call_nodes} to {num_dlg_nodes}."
Expand All @@ -49,6 +92,11 @@ def check_num_delegated_nodes(self, num_dlg_nodes):
), f"Number of delegated nodes increased from {self.exp_num_delegate_call_nodes} to {num_dlg_nodes}."

def verify_graph(self, graph):
"""Verifies the graph meets delegation and node presence expectations.

:param graph: The FX graph to verify
:raises AssertionError: If verification fails
"""
nodes = list(graph.nodes)

# Check for specific non delegated nodes
Expand Down Expand Up @@ -84,3 +132,133 @@ def verify_graph(self, graph):
assert (
not unexpected_aten_fn_nodes
), f"Graphs contains unexpected aten nodes:\n{unexpected_aten_fn_nodes}."


# Type alias for operators - can be either EdgeOpOverload or any callable (e.g., operator.getitem).
Operator = EdgeOpOverload | Callable


class DetailedGraphVerifier(GraphVerifier):
"""Graph verifier that checks for exact delegated and non-delegated operators.

This verifier captures a snapshot of the graph immediately after partitioning and verifies
that specific operators were delegated/non-delegated the expected number of times. It uses
mocker to intercept the partition() call and create a deep copy of the nodes before they
can be modified. Quantization/dequantization operators are ignored by default as they are
typically not the focus of delegation verification.
"""

default_ops_to_ignore = {
QuantizePerTensor,
QuantizePerChannel,
DequantizePerTensor,
DequantizePerChannel,
}

def __init__(
self,
mocker: MockerFixture,
*,
expected_delegated_ops: dict[Operator, int],
expected_non_delegated_ops: dict[Operator, int],
ops_to_ignore: set[Operator] | None = None,
):
"""Initializes the DetailedGraphVerifier and patches NeutronPartitioner.partition() to capture node state.

:param expected_delegated_ops: Dictionary mapping operators to their expected delegation count
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use NonDelegatedNode?

:param expected_non_delegated_ops: Dictionary mapping operators to their expected non-delegation count
:param mocker: Pytest mocker fixture for intercepting the partition method
:param ops_to_ignore: Set of operators to ignore during verification. Defaults to quantization ops
"""
self.expected_delegated_ops = expected_delegated_ops
self.expected_non_delegated_ops = expected_non_delegated_ops

self.ops_to_ignore = ops_to_ignore or self.default_ops_to_ignore

# We need to use mocker to capture a copy of the nodes returned by NeutronPartitioner.partition() to access
# their partition tag. The nodes in the returned graph may be modified after partition() returns, so we
# capture a deep copy immediately when the method completes.
self.captured_partitioned_nodes: list[Node] | None = None

# Store original partition method for the wrapper.
# Note: pytest-mock automatically restores the original method after the test completes,
# so manual cleanup is not required.
original_partition_method = NeutronPartitioner.partition

def partition_wrapper(self_, exported_program):
"""Wraps NeutronPartitioner.partition() to capture a snapshot of nodes after partitioning.

:param self_: The NeutronPartitioner instance
:param exported_program: The ExportedProgram being partitioned
:return: The PartitionResult from the original partition method
"""
result = original_partition_method(self_, exported_program)
# Capture a deep copy of the nodes with their metadata.
# This ensures we have the exact state immediately after partitioning,
# before any subsequent transformations modify the graph.
self.captured_partitioned_nodes = list(
deepcopy(exported_program.graph.nodes)
)
return result

# Patch the partition method to intercept and capture results.
mocker.patch.object(NeutronPartitioner, "partition", partition_wrapper)

def verify_graph(self, graph):
"""Verifies that operators were delegated/non-delegated as expected by comparing actual counts against expectations.

:param graph: The FX graph to verify (not directly used; we use captured nodes instead)
:raises AssertionError: If the NeutronPartitioner wasn't used or if delegation doesn't match expectations
"""
assert (
self.captured_partitioned_nodes is not None
), "The NeutronPartitioner was not used. Cannot access delegated nodes."

delegated_ops = defaultdict(int)
non_delegated_ops = defaultdict(int)

for node in self.captured_partitioned_nodes:
# Only process call_function nodes with a target
if not hasattr(node, "target") or node.op != "call_function":
continue

# Skip operators we're configured to ignore (e.g., quantization ops)
if node.target in self.ops_to_ignore:
continue

# Check if the node was tagged for delegation during partitioning
if NXP_DELEGATION_TAG in node.meta:
delegated_ops[node.target] += 1
else:
non_delegated_ops[node.target] += 1

# All ops which were either expected to be delegated, or were actually delegated.
all_delegated_ops = list(set(self.expected_delegated_ops).union(delegated_ops))

# All ops which were either expected to be non-delegated, or were actually non-delegated.
all_non_delegated_ops = list(
set(self.expected_non_delegated_ops).union(non_delegated_ops)
)

message = ""

# Check delegated operators
for op in all_delegated_ops:
expected_count = self.expected_delegated_ops.get(op, 0)
real_count = delegated_ops.get(op, 0)
op_name = op.name() if hasattr(op, "name") else str(op)
if expected_count != real_count:
message += f"\t`{op_name}` was delegated {real_count} times instead of the expected {expected_count} times.\n"

# Check non-delegated operators
for op in all_non_delegated_ops:
expected_count = self.expected_non_delegated_ops.get(op, 0)
real_count = non_delegated_ops.get(op, 0)
op_name = op.name() if hasattr(op, "name") else str(op)
if expected_count != real_count:
message += f"\t`{op_name}` was NON-delegated {real_count} times instead of the expected {expected_count} times.\n"

if message:
raise AssertionError(
"Some operators were not delegated as expected:\n" + message
)
Loading
Loading