Shortcuts

Source code for bundled_program.core

# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import ctypes
import typing
from typing import Dict, List, Sequence, Type

import executorch.bundled_program.schema as bp_schema
import executorch.exir.schema as core_schema

import torch
import torch.fx
from executorch.bundled_program.config import ConfigValue, MethodTestSuite

from executorch.bundled_program.version import BUNDLED_PROGRAM_SCHEMA_VERSION
from executorch.exir._serialize import _serialize_pte_binary

from executorch.exir.tensor import get_scalar_type, scalar_type_enum, TensorSpec

# pyre-ignore
supported_program_type_table: Dict[Type[core_schema.KernelTypes], ConfigValue] = {
    core_schema.Tensor: torch.Tensor,
    core_schema.Int: int,
    core_schema.Double: float,
    core_schema.Bool: bool,
}


def emit_bundled_tensor(
    spec: TensorSpec, bundled_values: List[bp_schema.Value]
) -> None:
    # QuantizedSchema in tensor has deprecated and may not be used anymore.
    # So here we don't emit it.

    if spec.allocated_memory == 0:
        tensor_data: bytes = b""
    else:
        array_type = (
            ctypes.c_char * typing.cast(torch.UntypedStorage, spec.storage).nbytes()
        )
        spec_array = ctypes.cast(
            typing.cast(torch.UntypedStorage, spec.storage).data_ptr(),
            ctypes.POINTER(array_type),
        ).contents
        tensor_data: bytes = bytes(spec_array)

    bundled_values.append(
        bp_schema.Value(
            val=bp_schema.Tensor(
                scalar_type=scalar_type_enum(spec.dtype),
                sizes=spec.shape,
                data=tensor_data,
                dim_order=list(spec.dim_order),
            ),
        )
    )


def emit_prim(val: ConfigValue, bundled_values: List[bp_schema.Value]):
    if type(val) == int:
        bundled_values.append(bp_schema.Value(val=bp_schema.Int(int_val=val)))
    elif type(val) == bool:
        bundled_values.append(bp_schema.Value(val=bp_schema.Bool(bool_val=val)))
    elif type(val) == float:
        bundled_values.append(bp_schema.Value(val=bp_schema.Double(double_val=val)))
    else:
        assert 0, "Unsupported primitive type received."


def get_program_input(
    program: core_schema.Program, plan_idx: int, input_idx: int
) -> core_schema.KernelTypes:
    return (
        program.execution_plan[plan_idx]
        .values[program.execution_plan[plan_idx].inputs[input_idx]]
        .val
    )


def get_program_output(
    program: core_schema.Program, plan_idx: int, output_idx: int
) -> core_schema.KernelTypes:
    return (
        program.execution_plan[plan_idx]
        .values[program.execution_plan[plan_idx].outputs[output_idx]]
        .val
    )


def get_input_dtype(
    program: core_schema.Program, plan_idx: int, input_idx: int
) -> torch.dtype:
    # pyre-fixme[16]: now assert all input and outputs is in tenor type. Support multuple datatypes in the future.
    return get_scalar_type(get_program_input(program, plan_idx, input_idx).scalar_type)


def get_input_type(program: core_schema.Program, plan_idx: int, input_idx: int) -> type:
    type_lookup = {
        core_schema.Int: int,
        core_schema.Bool: bool,
        core_schema.Double: float,
    }
    # pyre-fixme[6]: Incompatible parameter type [6]: In call `dict.__getitem__`, for 1st positional only parameter
    # expected `Type[Union[core_schema.Bool, core_schema.Double, core_schema.Int]]` but got `Type[Union[core_schema.Bool, core_schema.Double, core_schema.Int, core_schema.Tensor, BoolList, DoubleList,
    # IntList, Null, OptionalTensorList, String, TensorList]]`.
    return type_lookup[type(get_program_input(program, plan_idx, input_idx))]


def get_output_dtype(
    program: core_schema.Program, plan_idx: int, output_idx: int
) -> torch.dtype:
    return get_scalar_type(
        # pyre-ignore[16]: now assert all outputs is in tensor type.
        get_program_output(program, plan_idx, output_idx).scalar_type
    )


def assert_valid_bundle(
    program: core_schema.Program,
    method_test_suites: Sequence[MethodTestSuite],
) -> None:
    """Check if the program and method_test_suites matches each other.

    Other checks not related to correspondence are done in config.py

    Args:
        program: The program to be bundled.
        method_test_suites: The testcases for specific methods to be bundled.

    """

    method_name_of_program = {e.name for e in program.execution_plan}
    method_name_of_test_suites = {t.method_name for t in method_test_suites}

    assert method_name_of_test_suites.issubset(
        method_name_of_program
    ), f"All method names in bundled config should be found in program.execution_plan, \
         but {str(method_name_of_test_suites - method_name_of_program)} does not include."

    # check if method_test_suites has been sorted in ascending alphabetical order of method name.
    for test_suite_id in range(1, len(method_test_suites)):
        assert (
            method_test_suites[test_suite_id - 1].method_name
            <= method_test_suites[test_suite_id].method_name
        ), f"The method name of test suite should be sorted in ascending alphabetical \
            order of method name, but {test_suite_id-1}-th and {test_suite_id}-th method_test_suite aren't."

    # Check if the inputs' type meet Program's requirement
    for method_test_suite in method_test_suites:

        # Get the method with same method name as method_test_suite
        program_plan_id = -1
        for plan in program.execution_plan:
            if plan.name == method_test_suite.method_name:
                program_plan_id = program.execution_plan.index(plan)
                break

        # Raise Assertion Error if can not find the method with same method_name as method_test_suite in program.
        assert (
            program_plan_id != -1
        ), f"method_test_suites has testcases for method {method_test_suite.method_name}, but can not find it in the given program. All method names in the program are {', '.join([p.name for p in program.execution_plan])}."

        plan = program.execution_plan[program_plan_id]

        # Check if the type of Program's input is supported
        for index in range(len(plan.inputs)):
            assert (
                type(get_program_input(program, program_plan_id, index))
                in supported_program_type_table
            ), "The type of program's input isn't supported."

        # Check if the type of Program's output is supported
        for index in range(len(plan.outputs)):
            assert (
                type(get_program_output(program, program_plan_id, index))
                == core_schema.Tensor
            ), "Only supports program with output in Tensor type."

        # Check if the I/O sets of each execution plan test match program's requirement.
        for i in range(len(method_test_suite.test_cases)):
            cur_plan_test_inputs = method_test_suite.test_cases[i].inputs
            cur_plan_test_expected_outputs = method_test_suite.test_cases[
                i
            ].expected_outputs

            assert len(plan.inputs) == len(
                cur_plan_test_inputs
            ), "The number of input in each bundled set and Program shall equal, but get {} and {}".format(
                len(plan.inputs),
                len(cur_plan_test_inputs),
            )

            # Check if bundled input in the current exeution plan test share same type as input in Program
            for j in range(len(cur_plan_test_inputs)):
                assert (
                    type(cur_plan_test_inputs[j])
                    == supported_program_type_table[
                        type(get_program_input(program, program_plan_id, j))
                    ]
                ), "The type {}-th input in {}-th test set of {}-th execution plan does not meet Program's requirement: expected {} but get {}".format(
                    j,
                    i,
                    program_plan_id,
                    supported_program_type_table[
                        type(get_program_input(program, program_plan_id, j))
                    ],
                    type(cur_plan_test_inputs[j]),
                )

                # type of tensor input should match execution plan
                if type(cur_plan_test_inputs[j]) == torch.Tensor:
                    # pyre-fixme[16]: Undefined attribute [16]: Item `bool` of `typing.Union[bool, float, int, torch._tensor.Tensor]`
                    # has no attribute `dtype`.
                    assert cur_plan_test_inputs[j].dtype == get_input_dtype(
                        program, program_plan_id, j
                    ), "The input tensor {} dtype shall be {}, but now is {}".format(
                        cur_plan_test_inputs[j],
                        get_input_dtype(program, program_plan_id, j),
                        cur_plan_test_inputs[j].dtype,
                    )
                elif type(cur_plan_test_inputs[j]) in (
                    int,
                    bool,
                    float,
                ):
                    assert type(cur_plan_test_inputs[j]) == get_input_type(
                        program, program_plan_id, j
                    ), "The input primitive dtype shall be {}, but now is {}".format(
                        get_input_type(program, program_plan_id, j),
                        type(cur_plan_test_inputs[j]),
                    )

            # Check if bundled expected output in the current exeution plan test share same type as output in Program
            for j in range(len(cur_plan_test_expected_outputs)):
                assert (
                    type(cur_plan_test_expected_outputs[j]) == torch.Tensor
                ), "The {}-th expected output shall be a tensor, but now is {}".format(
                    j, type(cur_plan_test_expected_outputs[j])
                )

                # pyre-fixme[16]: Undefined attribute [16]: Item `bool` of `typing.Union[bool, float, int, torch._tensor.Tensor]`
                # has no attribute `dtype`.
                assert cur_plan_test_expected_outputs[j].dtype == get_output_dtype(
                    program, program_plan_id, j
                ), "The label tensor {} dtype shall be {}, but now is {}".format(
                    cur_plan_test_expected_outputs[j],
                    get_output_dtype(program, program_plan_id, j),
                    cur_plan_test_expected_outputs[j].dtype,
                )


[docs]def create_bundled_program( program: core_schema.Program, method_test_suites: Sequence[MethodTestSuite], ) -> bp_schema.BundledProgram: """Create bp_schema.BundledProgram by bundling the given program and method_test_suites together. Args: program: The program to be bundled. method_test_suites: The testcases for certain methods to be bundled. Returns: The `BundledProgram` variable contains given ExecuTorch program and test cases. """ method_test_suites = sorted(method_test_suites, key=lambda x: x.method_name) assert_valid_bundle(program, method_test_suites) bundled_method_test_suites: List[bp_schema.BundledMethodTestSuite] = [] # Emit data and metadata of bundled tensor for method_test_suite in method_test_suites: bundled_test_cases: List[bp_schema.BundledMethodTestCase] = [] # emit I/O sets for each method test case for i in range(len(method_test_suite.test_cases)): inputs: List[bp_schema.Value] = [] expected_outputs: List[bp_schema.Value] = [] cur_plan_test_inputs = method_test_suite.test_cases[i].inputs cur_plan_test_expected_outputs = method_test_suite.test_cases[ i ].expected_outputs for input_val in cur_plan_test_inputs: if type(input_val) == torch.Tensor: emit_bundled_tensor( TensorSpec.from_tensor(input_val, const=True), inputs, ) else: emit_prim( input_val, inputs, ) for expected_output_tensor in cur_plan_test_expected_outputs: assert ( type(expected_output_tensor) == torch.Tensor ), "Only tensor outputs are currently supported." emit_bundled_tensor( TensorSpec.from_tensor(expected_output_tensor, const=True), expected_outputs, ) bundled_test_cases.append( bp_schema.BundledMethodTestCase( inputs=inputs, expected_outputs=expected_outputs ) ) # emit the whole execution plan test bundled_method_test_suites.append( bp_schema.BundledMethodTestSuite( method_name=method_test_suite.method_name, test_cases=bundled_test_cases ) ) program_bytes: bytes = _serialize_pte_binary(program) return bp_schema.BundledProgram( version=BUNDLED_PROGRAM_SCHEMA_VERSION, method_test_suites=bundled_method_test_suites, program=program_bytes, )

Docs

Access comprehensive developer documentation for PyTorch

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources