diff --git a/backends/nxp/aten_passes/decompose_split_to_slices_pass.py b/backends/nxp/aten_passes/decompose_split_to_slices_pass.py index 695466dd96b..d79397481b2 100644 --- a/backends/nxp/aten_passes/decompose_split_to_slices_pass.py +++ b/backends/nxp/aten_passes/decompose_split_to_slices_pass.py @@ -2,6 +2,7 @@ # # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. + from typing import Optional, TypeAlias import torch @@ -10,6 +11,9 @@ from torch.fx.passes.infra.pass_base import PassBase, PassResult +SlicesArgs: TypeAlias = tuple[list[int], list[int], int] + + class DecomposeSplitToSlicesPass(PassBase): """ The `split` operator returns multiple tensors by partitioning `x` along `dim`. Each partitioning can be done @@ -103,8 +107,7 @@ def _create_slice_node(self, *slice_args) -> Node: return slice_node - SlicesArgs: TypeAlias = tuple[list[int], list[int], int] - + # noinspection PyMethodMayBeStatic def _get_slices_args(self, split_node: Node) -> SlicesArgs: split_nodes_chunks = split_node.meta["val"] dim = 0 if len(split_node.args) < 3 else split_node.args[2] diff --git a/backends/nxp/tests/generic_tests/test_decompose_split_to_slices.py b/backends/nxp/tests/generic_tests/test_decompose_split_to_slices.py index 8e5ebf53fba..de4e684405c 100644 --- a/backends/nxp/tests/generic_tests/test_decompose_split_to_slices.py +++ b/backends/nxp/tests/generic_tests/test_decompose_split_to_slices.py @@ -6,30 +6,22 @@ import numpy as np import pytest import torch + from executorch.backends.nxp.aten_passes.neutron_aten_pass_manager import ( DecomposeSplitToSlicesPass, NeutronAtenPassManager, SplitGRUBasedOnNumLayers, ) - -from executorch.backends.nxp.backend.edge_program_converter import ( - EdgeProgramToIRConverter, -) -from executorch.backends.nxp.tests.executorch_pipeline import ( - neutron_target_spec, - to_quantized_edge_program, -) -from executorch.backends.nxp.tests.executors import ( - convert_run_compare, - graph_contains_any_of_ops, -) +from executorch.backends.nxp.tests.executorch_pipeline import neutron_target_spec +from executorch.backends.nxp.tests.executors import graph_contains_any_of_ops +from executorch.backends.nxp.tests.graph_verifier import DetailedGraphVerifier from executorch.backends.nxp.tests.models import ( GRUModel, SplitWithSections, SplitWithSize, ) -from executorch.exir.dialects._ops import ops as exir_ops -from torch.export import ExportedProgram +from executorch.backends.nxp.tests.nsys_testing import lower_run_compare +from executorch.backends.nxp.tests.ops_aliases import SliceCopy @pytest.fixture(autouse=True) @@ -38,273 +30,258 @@ def reseed_model_per_test_run(): np.random.seed(23) -@pytest.mark.parametrize( - "input_shape, split_size, dim", - [ - pytest.param((8,), 3, 0, id="1D."), - pytest.param((4, 8), 5, 1, id="2D."), - ], -) -def test_decompose_split_with_size(mocker, input_shape, split_size, dim): - model = SplitWithSize(split_size, dim) - example_input = torch.rand(input_shape) - - exir_program_aten = torch.export.export(model, (example_input,)).module() +class TestDecomposeSplit: - # Check "aten.split.Tensor" is present - assert graph_contains_any_of_ops( - exir_program_aten.graph, [torch.ops.aten.split.Tensor] - ) - outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] - - # Apply the optimization. - NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( - exir_program_aten - ) - - # Make sure no "Split" is in the model. - assert not graph_contains_any_of_ops( - exir_program_aten.graph, + @pytest.mark.parametrize( + "input_shape, split_size, dim", [ - torch.ops.aten.split.Tensor, - torch.ops.aten.split.default, - torch.ops.aten.split_with_sizes.default, + pytest.param((8,), 3, 0, id="1D"), + pytest.param((4, 8), 5, 1, id="2D"), + pytest.param((3, 5, 7), 2, -1, id="3D (no num_macs multiples)"), + pytest.param((7, 3, 5, 2), 3, 0, id="4D (no num_macs multiples)"), + pytest.param((2, 3, 2, 11, 3), 5, -2, id="5D (no num_macs multiples)"), ], ) - - # Check correct placement of slices - nodes = list(exir_program_aten.graph.nodes) - slices_count = input_shape[dim] // split_size - # Slice nodes start appearing at index 1 - slices_start_idx = 1 - - for i in range(0, slices_count): - assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor - - outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] - - # Make sure the model still produces the exact same output. - assert len(outputs_before) == len(outputs_after) - - for i in range(len(outputs_before)): - assert np.allclose(outputs_before[i], outputs_after[i]) - - -@pytest.mark.parametrize( - "input_shape, sections, dim", - [ - pytest.param((8,), [5, 3], 0, id="1D."), - pytest.param((4, 8), [3, 3, 2], 1, id="2D."), - ], -) -def test_decompose_split_with_section(mocker, input_shape, sections, dim): - model = SplitWithSections(sections, dim) - example_input = torch.rand(input_shape) - - exir_program_aten = torch.export.export(model, (example_input,)).module() - - # Check "aten.split_with_sizes" is present - assert graph_contains_any_of_ops( - exir_program_aten.graph, [torch.ops.aten.split_with_sizes.default] - ) - outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] - - # Apply the optimization. - NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( - exir_program_aten - ) - - # Make sure no "Split" is in the model. - assert not graph_contains_any_of_ops( - exir_program_aten.graph, + def test__size(self, input_shape, split_size, dim): + model = SplitWithSize(split_size, dim) + example_input = torch.rand(input_shape) + + exir_program_aten = torch.export.export(model, (example_input,)).module() + + # Check "aten.split.Tensor" is present + assert graph_contains_any_of_ops( + exir_program_aten.graph, [torch.ops.aten.split.Tensor] + ) + outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( + exir_program_aten + ) + + # Make sure no "Split" is in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + torch.ops.aten.split.Tensor, + torch.ops.aten.split.default, + torch.ops.aten.split_with_sizes.default, + ], + ) + + # Check correct placement of slices + nodes = list(exir_program_aten.graph.nodes) + slices_count = input_shape[dim] // split_size + # Slice nodes start appearing at index 1 + slices_start_idx = 1 + + for i in range(0, slices_count): + assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor + + outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) + + @pytest.mark.parametrize( + "input_shape, sections, dim", [ - torch.ops.aten.split.Tensor, - torch.ops.aten.split.default, - torch.ops.aten.split_with_sizes.default, + pytest.param((8,), [5, 3], 0, id="1D"), + pytest.param((4, 8), [3, 3, 2], 1, id="2D"), + pytest.param((3, 5, 7), [4, 3], -1, id="3D (no num_macs multiples)"), + pytest.param((7, 3, 5, 2), [3, 3, 1], 0, id="4D (no num_macs multiples)"), + pytest.param( + (2, 3, 2, 11, 3), [4, 4, 3], -2, id="5D (no num_macs multiples)" + ), ], ) - - # Check correct placement of slices - nodes = list(exir_program_aten.graph.nodes) - slices_count = len(sections) - # Slice nodes start appearing at index 1 - slices_start_idx = 1 - - for i in range(0, slices_count): - assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor - - outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] - - # Make sure the model still produces the exact same output. - assert len(outputs_before) == len(outputs_after) - - for i in range(len(outputs_before)): - assert np.allclose(outputs_before[i], outputs_after[i]) - - -@pytest.mark.parametrize( - "gru_layers", - [ - pytest.param(2, id="2 GRU layers"), - ], -) -def test_decompose_gru_with_split(mocker, gru_layers): - model = GRUModel(gru_layers).eval() - - input_shape = (8, 1, 8) - example_input = (torch.ones(input_shape),) - - exir_program_aten = torch.export.export(model, example_input).module() - - # Apply the pass to split the `aten.gru.input` into multiple instances, which adds a split operator - NeutronAtenPassManager(neutron_target_spec, [SplitGRUBasedOnNumLayers()])( - exir_program_aten - ) - - # Check "aten.split.default" is present - assert graph_contains_any_of_ops( - exir_program_aten.graph, [torch.ops.aten.split.default] - ) - - outputs_before = [o.detach().numpy() for o in exir_program_aten(*example_input)] - - # Apply the optimization. - NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( - exir_program_aten - ) - - # Make sure no "Split" is in the model. - assert not graph_contains_any_of_ops( - exir_program_aten.graph, + def test__section(self, input_shape, sections, dim): + model = SplitWithSections(sections, dim) + example_input = torch.rand(input_shape) + + exir_program_aten = torch.export.export(model, (example_input,)).module() + + # Check "aten.split_with_sizes" is present + assert graph_contains_any_of_ops( + exir_program_aten.graph, [torch.ops.aten.split_with_sizes.default] + ) + outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( + exir_program_aten + ) + + # Make sure no "Split" is in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + torch.ops.aten.split.Tensor, + torch.ops.aten.split.default, + torch.ops.aten.split_with_sizes.default, + ], + ) + + # Check correct placement of slices + nodes = list(exir_program_aten.graph.nodes) + slices_count = len(sections) + # Slice nodes start appearing at index 1 + slices_start_idx = 1 + + for i in range(0, slices_count): + assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor + + outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) + + @pytest.mark.parametrize( + "gru_layers", [ - torch.ops.aten.split.Tensor, - torch.ops.aten.split.default, - torch.ops.aten.split_with_sizes.default, + pytest.param(2, id="2 GRU layers"), ], ) + def test__with_gru(self, gru_layers): + model = GRUModel(gru_layers).eval() - # Check correct placement of slices - nodes = list(exir_program_aten.graph.nodes) - slices_count = gru_layers - # Slice nodes start appearing at index 10 for gru_layer=2, for gru_layer=3 they start at index 14... - slices_start_idx = 4 * gru_layers + 2 - - for i in range(0, slices_count): - assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor - - outputs_after = [o.detach().numpy() for o in exir_program_aten(*example_input)] + input_shape = (8, 1, 8) + example_input = (torch.ones(input_shape),) - # Make sure the model still produces the exact same output. - assert len(outputs_before) == len(outputs_after) + exir_program_aten = torch.export.export(model, example_input).module() - for i in range(len(outputs_before)): - assert np.allclose(outputs_before[i], outputs_after[i]) + # Apply the pass to split the `aten.gru.input` into multiple instances, which adds a split operator + NeutronAtenPassManager(neutron_target_spec, [SplitGRUBasedOnNumLayers()])( + exir_program_aten + ) + # Check "aten.split.default" is present + assert graph_contains_any_of_ops( + exir_program_aten.graph, [torch.ops.aten.split.default] + ) -@pytest.mark.parametrize( - "input_shape, size_or_sections, dim", - [ - pytest.param((8, 4), 4, 1, id="2D, one chunk using split size."), - pytest.param( - (8, 4), - 5, - 1, - id="2D, one chunk using split size, chunk size over the limit.", - ), - pytest.param((8, 4), [4], 1, id="2D, one chunk using sections."), - ], -) -def test_decompose_split_with_one_chunk(mocker, input_shape, size_or_sections, dim): - if isinstance(size_or_sections, list): - model = SplitWithSections(size_or_sections, dim) - else: - model = SplitWithSize(size_or_sections, dim) - example_input = torch.rand(input_shape) - - exir_program_aten = torch.export.export(model, (example_input,)).module() - - # Check "aten.split" is present - assert graph_contains_any_of_ops( - exir_program_aten.graph, - [torch.ops.aten.split.Tensor, torch.ops.aten.split_with_sizes.default], - ) - outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + outputs_before = [o.detach().numpy() for o in exir_program_aten(*example_input)] - # Apply the optimization. - NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( - exir_program_aten - ) + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( + exir_program_aten + ) - # Make sure no "Split" is in the model. - assert not graph_contains_any_of_ops( - exir_program_aten.graph, - [ - torch.ops.aten.split.Tensor, - torch.ops.aten.split.default, - torch.ops.aten.split_with_sizes.default, - ], - ) + # Make sure no "Split" is in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + torch.ops.aten.split.Tensor, + torch.ops.aten.split.default, + torch.ops.aten.split_with_sizes.default, + ], + ) - # Make sure there are no "aten.slice.Tensor" either. Since the split was done using one chunk, - # slicing is unnecessary - assert not graph_contains_any_of_ops( - exir_program_aten.graph, [torch.ops.aten.slice.Tensor] - ) + # Check correct placement of slices + nodes = list(exir_program_aten.graph.nodes) + slices_count = gru_layers + # Slice nodes start appearing at index 10 for gru_layer=2, for gru_layer=3 they start at index 14... + slices_start_idx = 4 * gru_layers + 2 - outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + for i in range(0, slices_count): + assert nodes[slices_start_idx + i].target == torch.ops.aten.slice.Tensor - # Make sure the model still produces the exact same output. - assert len(outputs_before) == len(outputs_after) + outputs_after = [o.detach().numpy() for o in exir_program_aten(*example_input)] - for i in range(len(outputs_before)): - assert np.allclose(outputs_before[i], outputs_after[i]) + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) -@pytest.mark.parametrize( - "input_shape, size_or_sections, dim", - [ - pytest.param((8, 24), [16, 8], 1, id="2D, with sections."), - pytest.param((8, 24), 3, 1, id="2D, with sizes."), - ], -) -def test_decompose_gru_with_split_full_pipeline( - mocker, input_shape, size_or_sections, dim -): - converter_spy = mocker.spy(EdgeProgramToIRConverter, "convert_program") - - if isinstance(size_or_sections, list): - model = SplitWithSections(size_or_sections, dim) - else: - model = SplitWithSize(size_or_sections, dim) - - # Run conversion - edge_program = to_quantized_edge_program(model, input_shape).exported_program() - - # Make sure no "Split" is in the model. - assert not graph_contains_any_of_ops( - edge_program.graph, + @pytest.mark.parametrize( + "input_shape, size_or_sections, dim", [ - torch.ops.aten.split.Tensor, - torch.ops.aten.split.default, - torch.ops.aten.split_with_sizes.default, + pytest.param((3, 4), 4, 1, id="2D, one chunk using split size"), + pytest.param( + (5, 4), + 5, + 1, + id="2D, one chunk using split size, chunk size over the limit", + ), + pytest.param((8, 4), [4], 1, id="2D, one chunk using sections"), ], ) - - # Capture generated model - neutron_ir_model = converter_spy.spy_return[0] - exported_program: ExportedProgram = converter_spy.call_args.args[1] - - # Make sure "edge.aten.slice_copy.Tensor" is in the model. - assert graph_contains_any_of_ops( - exported_program.graph, - [exir_ops.edge.aten.slice_copy.Tensor], - ) - - example_input = (np.random.random(input_shape).astype(np.float32) * 50).astype( - np.int8 - ) - convert_run_compare( - exported_program, - input_data=example_input, - tfl_model=neutron_ir_model, + def test__one_chunk(self, input_shape, size_or_sections, dim): + if isinstance(size_or_sections, list): + model = SplitWithSections(size_or_sections, dim) + else: + model = SplitWithSize(size_or_sections, dim) + example_input = torch.rand(input_shape) + + exir_program_aten = torch.export.export(model, (example_input,)).module() + + # Check "aten.split" is present + assert graph_contains_any_of_ops( + exir_program_aten.graph, + [torch.ops.aten.split.Tensor, torch.ops.aten.split_with_sizes.default], + ) + outputs_before = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Apply the optimization. + NeutronAtenPassManager(neutron_target_spec, [DecomposeSplitToSlicesPass()])( + exir_program_aten + ) + + # Make sure no "Split" is in the model. + assert not graph_contains_any_of_ops( + exir_program_aten.graph, + [ + torch.ops.aten.split.Tensor, + torch.ops.aten.split.default, + torch.ops.aten.split_with_sizes.default, + ], + ) + + # Make sure there are no "aten.slice.Tensor" either. Since the split was done using one chunk, + # slicing is unnecessary + assert not graph_contains_any_of_ops( + exir_program_aten.graph, [torch.ops.aten.slice.Tensor] + ) + + outputs_after = [o.detach().numpy() for o in exir_program_aten(example_input)] + + # Make sure the model still produces the exact same output. + assert len(outputs_before) == len(outputs_after) + + for i in range(len(outputs_before)): + assert np.allclose(outputs_before[i], outputs_after[i]) + + @pytest.mark.parametrize( + "input_shape, size_or_sections, dim", + [ + pytest.param((8, 24), [16, 8], 1, id="2D, with sections"), + pytest.param( + (3, 5, 7), [3, 3, 1], -1, id="3D, with sections, no num_macs_multiples" + ), + pytest.param((8, 24), 3, 1, id="2D, with sizes"), + pytest.param((3, 5, 7), 4, -1, id="3D, with sizes, no num_macs_multiples"), + ], ) + def test__full_pipeline(self, mocker, request, input_shape, size_or_sections, dim): + if isinstance(size_or_sections, list): + model = SplitWithSections(size_or_sections, dim) + num_slices = len(size_or_sections) + else: + model = SplitWithSize(size_or_sections, dim) + num_slices = input_shape[dim] // size_or_sections + int( + input_shape[dim] % size_or_sections != 0 + ) + + graph_verifier = DetailedGraphVerifier( + mocker, + expected_delegated_ops={SliceCopy: num_slices}, + expected_non_delegated_ops={}, + ) + lower_run_compare(model, input_shape, graph_verifier, request)