diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index 6c2bde9e..378e03fb 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -40,6 +40,8 @@ jobs: python -m pip install flake8 - name: flake8 run: flake8 clvm tests --max-line-length=120 + - name: mypy + run: mypy - name: Test with pytest run: | pytest tests diff --git a/clvm/CLVMObject.py b/clvm/CLVMObject.py index 7586968c..04cffdf0 100644 --- a/clvm/CLVMObject.py +++ b/clvm/CLVMObject.py @@ -1,6 +1,22 @@ +from __future__ import annotations + import typing +class CLVMStorage(typing.Protocol): + # It's not clear if it is possible to express the exclusivity without maybe + # restructuring all the classes, such as having a separate instance for each + # of the atom and pair cases and hinting a union of a protocol of each type. + atom: typing.Optional[bytes] + pair: typing.Optional[PairType] + + +PairType = typing.Tuple[CLVMStorage, CLVMStorage] + + +_T_CLVMObject = typing.TypeVar("_T_CLVMObject", bound="CLVMObject") + + class CLVMObject: """ This class implements the CLVM Object protocol in the simplest possible way, @@ -11,19 +27,26 @@ class CLVMObject: # this is always a 2-tuple of an object implementing the CLVM object # protocol. - pair: typing.Optional[typing.Tuple[typing.Any, typing.Any]] + pair: typing.Optional[PairType] __slots__ = ["atom", "pair"] - def __new__(class_, v): - if isinstance(v, CLVMObject): + @staticmethod + def __new__( + class_: typing.Type[_T_CLVMObject], + v: typing.Union[_T_CLVMObject, bytes, PairType], + ) -> _T_CLVMObject: + if isinstance(v, class_): return v + # mypy does not realize that the isinstance check is type narrowing like this + narrowed_v: typing.Union[bytes, PairType] = v # type: ignore[assignment] + self = super(CLVMObject, class_).__new__(class_) - if isinstance(v, tuple): - if len(v) != 2: - raise ValueError("tuples must be of size 2, cannot create CLVMObject from: %s" % str(v)) - self.pair = v + if isinstance(narrowed_v, tuple): + if len(narrowed_v) != 2: + raise ValueError("tuples must be of size 2, cannot create CLVMObject from: %s" % str(narrowed_v)) + self.pair = narrowed_v self.atom = None else: - self.atom = v + self.atom = narrowed_v self.pair = None return self diff --git a/clvm/EvalError.py b/clvm/EvalError.py index f71f912a..e8e31482 100644 --- a/clvm/EvalError.py +++ b/clvm/EvalError.py @@ -1,4 +1,12 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from clvm.SExp import SExp + + class EvalError(Exception): - def __init__(self, message: str, sexp): + def __init__(self, message: str, sexp: SExp) -> None: super().__init__(message) self._sexp = sexp diff --git a/clvm/SExp.py b/clvm/SExp.py index 9901849c..0f22a99f 100644 --- a/clvm/SExp.py +++ b/clvm/SExp.py @@ -1,9 +1,12 @@ +from __future__ import annotations + import io import typing +import typing_extensions from .as_python import as_python -from .CLVMObject import CLVMObject +from .CLVMObject import CLVMObject, CLVMStorage from .EvalError import EvalError @@ -16,27 +19,28 @@ CastableType = typing.Union[ "SExp", - "CLVMObject", + CLVMStorage, + typing.SupportsBytes, bytes, str, int, None, - list, - typing.Tuple[typing.Any, typing.Any], + typing.Sequence["CastableType"], + typing.Tuple["CastableType", "CastableType"], ] NULL = b"" -def looks_like_clvm_object(o: typing.Any) -> bool: +def looks_like_clvm_object(o: typing.Any) -> typing_extensions.TypeGuard[CLVMStorage]: d = dir(o) return "atom" in d and "pair" in d # this function recognizes some common types and turns them into plain bytes, def convert_atom_to_bytes( - v: typing.Union[bytes, str, int, None, list], + v: typing.Union[bytes, str, int, None, typing.List[typing_extensions.Never], typing.SupportsBytes], ) -> bytes: if isinstance(v, bytes): @@ -55,10 +59,15 @@ def convert_atom_to_bytes( raise ValueError("can't cast %s (%s) to bytes" % (type(v), v)) +ValType = typing.Union["SExp", CastableType] +StackType = typing.List[ValType] + + # returns a clvm-object like object +@typing.no_type_check def to_sexp_type( v: CastableType, -): +) -> CLVMStorage: stack = [v] ops = [(0, None)] # convert @@ -115,6 +124,9 @@ def to_sexp_type( return stack[0] +_T_SExp = typing.TypeVar("_T_SExp", bound="SExp") + + class SExp: """ SExp provides higher level API on top of any object implementing the CLVM @@ -129,86 +141,89 @@ class SExp: elements implementing the CLVM object protocol. Exactly one of "atom" and "pair" must be None. """ - true: "SExp" - false: "SExp" - __null__: "SExp" + true: typing.ClassVar[SExp] + false: typing.ClassVar[SExp] + __null__: typing.ClassVar[SExp] # the underlying object implementing the clvm object protocol atom: typing.Optional[bytes] # this is a tuple of the otherlying CLVMObject-like objects. i.e. not # SExp objects with higher level functions, or None - pair: typing.Optional[typing.Tuple[typing.Any, typing.Any]] + pair: typing.Optional[typing.Tuple[CLVMStorage, CLVMStorage]] - def __init__(self, obj): + def __init__(self, obj: CLVMStorage) -> None: self.atom = obj.atom self.pair = obj.pair # this returns a tuple of two SExp objects, or None - def as_pair(self) -> typing.Tuple["SExp", "SExp"]: + def as_pair(self) -> typing.Optional[typing.Tuple[SExp, SExp]]: pair = self.pair if pair is None: return pair return (self.__class__(pair[0]), self.__class__(pair[1])) # TODO: deprecate this. Same as .atom property - def as_atom(self): + def as_atom(self) -> typing.Optional[bytes]: return self.atom - def listp(self): + def listp(self) -> bool: return self.pair is not None - def nullp(self): + def nullp(self) -> bool: v = self.atom return v is not None and len(v) == 0 - def as_int(self): + def as_int(self) -> int: + if self.atom is None: + raise TypeError("Unable to convert a pair to an int") return int_from_bytes(self.atom) - def as_bin(self): + def as_bin(self) -> bytes: f = io.BytesIO() sexp_to_stream(self, f) return f.getvalue() + # TODO: should be `v: CastableType` @classmethod - def to(class_, v: CastableType) -> "SExp": - if isinstance(v, class_): + def to(cls: typing.Type[_T_SExp], v: typing.Any) -> _T_SExp: + if isinstance(v, cls): return v if looks_like_clvm_object(v): - return class_(v) + return cls(v) # this will lazily convert elements - return class_(to_sexp_type(v)) + return cls(to_sexp_type(v)) - def cons(self, right): + def cons(self: _T_SExp, right: _T_SExp) -> _T_SExp: return self.to((self, right)) - def first(self): + def first(self: _T_SExp) -> _T_SExp: pair = self.pair if pair: return self.__class__(pair[0]) raise EvalError("first of non-cons", self) - def rest(self): + def rest(self: _T_SExp) -> _T_SExp: pair = self.pair if pair: return self.__class__(pair[1]) raise EvalError("rest of non-cons", self) @classmethod - def null(class_): + def null(class_) -> SExp: return class_.__null__ - def as_iter(self): + def as_iter(self: _T_SExp) -> typing.Iterator[_T_SExp]: v = self while not v.nullp(): yield v.first() v = v.rest() - def __eq__(self, other: CastableType): + def __eq__(self, other: object) -> bool: try: - other = self.to(other) + other = self.to(typing.cast(CastableType, other)) to_compare_stack = [(self, other)] while to_compare_stack: s1, s2 = to_compare_stack.pop() @@ -226,7 +241,7 @@ def __eq__(self, other: CastableType): except ValueError: return False - def list_len(self): + def list_len(self) -> int: v = self size = 0 while v.listp(): @@ -234,13 +249,13 @@ def list_len(self): v = v.rest() return size - def as_python(self): + def as_python(self) -> typing.Any: return as_python(self) - def __str__(self): + def __str__(self) -> str: return self.as_bin().hex() - def __repr__(self): + def __repr__(self) -> str: return "%s(%s)" % (self.__class__.__name__, str(self)) diff --git a/clvm/as_python.py b/clvm/as_python.py index 2fd42c64..41ecf9a9 100644 --- a/clvm/as_python.py +++ b/clvm/as_python.py @@ -1,37 +1,61 @@ -def as_python(sexp): - def _roll(op_stack, val_stack): - v1 = val_stack.pop() - v2 = val_stack.pop() - val_stack.append(v1) - val_stack.append(v2) - - def _make_tuple(op_stack, val_stack): - left = val_stack.pop() - right = val_stack.pop() - if right == b"": - val_stack.append([left]) - elif isinstance(right, list): - v = [left] + right - val_stack.append(v) - else: - val_stack.append((left, right)) - - def _as_python(op_stack, val_stack): - t = val_stack.pop() - pair = t.as_pair() - if pair: - left, right = pair - op_stack.append(_make_tuple) - op_stack.append(_as_python) - op_stack.append(_roll) - op_stack.append(_as_python) - val_stack.append(left) - val_stack.append(right) - else: - val_stack.append(t.as_atom()) - - op_stack = [_as_python] - val_stack = [sexp] +from __future__ import annotations + +from typing import Any, Callable, List, Tuple, TYPE_CHECKING, Union + +if TYPE_CHECKING: + from clvm.SExp import SExp + +OpCallable = Callable[["OpStackType", "ValStackType"], None] + +PythonReturnType = Union[bytes, Tuple["PythonReturnType", "PythonReturnType"], List["PythonReturnType"]] + +ValType = Union["SExp", PythonReturnType] +ValStackType = List[ValType] + +OpStackType = List[OpCallable] + + +def _roll(op_stack: OpStackType, val_stack: ValStackType) -> None: + v1 = val_stack.pop() + v2 = val_stack.pop() + val_stack.append(v1) + val_stack.append(v2) + + +MakeTupleValStackType = List[Union[bytes, Tuple[object, object], "MakeTupleValStackType"]] + + +def _make_tuple(op_stack: OpStackType, val_stack: ValStackType) -> None: + left: PythonReturnType = val_stack.pop() # type: ignore[assignment] + right: PythonReturnType = val_stack.pop() # type: ignore[assignment] + if right == b"": + val_stack.append([left]) + elif isinstance(right, list): + v = [left] + right + val_stack.append(v) + else: + val_stack.append((left, right)) + + +def _as_python(op_stack: OpStackType, val_stack: ValStackType) -> None: + t: SExp = val_stack.pop() # type: ignore[assignment] + pair = t.as_pair() + if pair: + left, right = pair + op_stack.append(_make_tuple) + op_stack.append(_as_python) + op_stack.append(_roll) + op_stack.append(_as_python) + val_stack.append(left) + val_stack.append(right) + else: + # we know that t.atom is not None here because the pair is None + val_stack.append(t.atom) # type:ignore[arg-type] + + +def as_python(sexp: SExp) -> Any: + op_stack: OpStackType = [_as_python] + val_stack: ValStackType = [sexp] while op_stack: op_f = op_stack.pop() op_f(op_stack, val_stack) diff --git a/clvm/casts.py b/clvm/casts.py index d142c3a4..1a49886e 100644 --- a/clvm/casts.py +++ b/clvm/casts.py @@ -1,11 +1,11 @@ -def int_from_bytes(blob): +def int_from_bytes(blob: bytes) -> int: size = len(blob) if size == 0: return 0 return int.from_bytes(blob, "big", signed=True) -def int_to_bytes(v): +def int_to_bytes(v: int) -> bytes: byte_count = (v.bit_length() + 8) >> 3 if v == 0: return b"" @@ -17,7 +17,7 @@ def int_to_bytes(v): return r -def limbs_for_int(v): +def limbs_for_int(v: int) -> int: """ Return the number of bytes required to represent this integer. """ diff --git a/clvm/core_ops.py b/clvm/core_ops.py index 850e5ce7..437fc435 100644 --- a/clvm/core_ops.py +++ b/clvm/core_ops.py @@ -1,3 +1,7 @@ +from typing import Tuple, TypeVar + +from typing_extensions import Never + from .EvalError import EvalError from .SExp import SExp @@ -12,7 +16,10 @@ ) -def op_if(args: SExp): +_T_SExp = TypeVar("_T_SExp", bound=SExp) + + +def op_if(args: _T_SExp) -> Tuple[int, _T_SExp]: if args.list_len() != 3: raise EvalError("i takes exactly 3 arguments", args) r = args.rest() @@ -21,38 +28,38 @@ def op_if(args: SExp): return IF_COST, r.first() -def op_cons(args: SExp): +def op_cons(args: _T_SExp) -> Tuple[int, _T_SExp]: if args.list_len() != 2: raise EvalError("c takes exactly 2 arguments", args) return CONS_COST, args.first().cons(args.rest().first()) -def op_first(args: SExp): +def op_first(args: _T_SExp) -> Tuple[int, _T_SExp]: if args.list_len() != 1: raise EvalError("f takes exactly 1 argument", args) return FIRST_COST, args.first().first() -def op_rest(args: SExp): +def op_rest(args: _T_SExp) -> Tuple[int, _T_SExp]: if args.list_len() != 1: raise EvalError("r takes exactly 1 argument", args) return REST_COST, args.first().rest() -def op_listp(args: SExp): +def op_listp(args: SExp) -> Tuple[int, SExp]: if args.list_len() != 1: raise EvalError("l takes exactly 1 argument", args) return LISTP_COST, args.true if args.first().listp() else args.false -def op_raise(args: SExp): +def op_raise(args: SExp) -> Never: if args.list_len() == 1 and not args.first().listp(): raise EvalError("clvm raise", args.first()) else: raise EvalError("clvm raise", args) -def op_eq(args: SExp): +def op_eq(args: SExp) -> Tuple[int, SExp]: if args.list_len() != 2: raise EvalError("= takes exactly 2 arguments", args) a0 = args.first() @@ -60,7 +67,9 @@ def op_eq(args: SExp): if a0.pair or a1.pair: raise EvalError("= on list", a0 if a0.pair else a1) b0 = a0.as_atom() + assert b0 is not None b1 = a1.as_atom() + assert b1 is not None cost = EQ_BASE_COST cost += (len(b0) + len(b1)) * EQ_COST_PER_BYTE return cost, (args.true if b0 == b1 else args.false) diff --git a/clvm/more_ops.py b/clvm/more_ops.py index 7b664dd5..ecabb05b 100644 --- a/clvm/more_ops.py +++ b/clvm/more_ops.py @@ -1,5 +1,6 @@ import hashlib import io +import typing from chia_rs import G1Element, PrivateKey @@ -50,11 +51,16 @@ ) -def malloc_cost(cost, atom: SExp): +_T_SExp = typing.TypeVar("_T_SExp", bound=SExp) + + +def malloc_cost(cost: int, atom: _T_SExp) -> typing.Tuple[int, _T_SExp]: + if atom.atom is None: + raise ValueError("Atom must have a non-None atom attribute") return cost + len(atom.atom) * MALLOC_COST_PER_BYTE, atom -def op_sha256(args: SExp): +def op_sha256(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = SHA256_BASE_COST arg_len = 0 h = hashlib.sha256() @@ -69,23 +75,23 @@ def op_sha256(args: SExp): return malloc_cost(cost, args.to(h.digest())) -def args_as_ints(op_name, args: SExp): +def args_as_ints(op_name: str, args: SExp) -> typing.Iterator[typing.Tuple[int, int]]: for arg in args.as_iter(): - if arg.pair: + if arg.atom is None: raise EvalError("%s requires int args" % op_name, arg) - yield (arg.as_int(), len(arg.as_atom())) + yield (arg.as_int(), len(arg.atom)) -def args_as_int32(op_name, args: SExp): +def args_as_int32(op_name: str, args: SExp) -> typing.Iterator[int]: for arg in args.as_iter(): - if arg.pair: + if arg.atom is None: raise EvalError("%s requires int32 args" % op_name, arg) if len(arg.atom) > 4: raise EvalError("%s requires int32 args (with no leading zeros)" % op_name, arg) yield arg.as_int() -def args_as_int_list(op_name, args, count): +def args_as_int_list(op_name: str, args: SExp, count: int) -> typing.List[typing.Tuple[int, int]]: int_list = list(args_as_ints(op_name, args)) if len(int_list) != count: plural = "s" if count != 1 else "" @@ -93,7 +99,7 @@ def args_as_int_list(op_name, args, count): return int_list -def args_as_bools(op_name, args): +def args_as_bools(op_name: str, args: SExp) -> typing.Iterator[SExp]: for arg in args.as_iter(): v = arg.as_atom() if v == b"": @@ -102,7 +108,7 @@ def args_as_bools(op_name, args): yield args.true -def args_as_bool_list(op_name, args, count): +def args_as_bool_list(op_name: str, args: SExp, count: int) -> typing.List[SExp]: bool_list = list(args_as_bools(op_name, args)) if len(bool_list) != count: plural = "s" if count != 1 else "" @@ -110,7 +116,7 @@ def args_as_bool_list(op_name, args, count): return bool_list -def op_add(args: SExp): +def op_add(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: total = 0 cost = ARITH_BASE_COST arg_size = 0 @@ -122,7 +128,7 @@ def op_add(args: SExp): return malloc_cost(cost, args.to(total)) -def op_subtract(args): +def op_subtract(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = ARITH_BASE_COST if args.nullp(): return malloc_cost(cost, args.to(0)) @@ -138,7 +144,7 @@ def op_subtract(args): return malloc_cost(cost, args.to(total)) -def op_multiply(args): +def op_multiply(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = MUL_BASE_COST operands = args_as_ints("*", args) try: @@ -155,20 +161,21 @@ def op_multiply(args): return malloc_cost(cost, args.to(v)) -def op_divmod(args): +def op_divmod(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = DIVMOD_BASE_COST (i0, l0), (i1, l1) = args_as_int_list("divmod", args, 2) if i1 == 0: raise EvalError("divmod with 0", args.to(i0)) cost += (l0 + l1) * DIVMOD_COST_PER_BYTE q, r = divmod(i0, i1) - q1 = args.to(q) - r1 = args.to(r) - cost += (len(q1.atom) + len(r1.atom)) * MALLOC_COST_PER_BYTE + # since q and r are integers, the atoms will be non-None + q1_atom: bytes = args.to(q).atom # type: ignore[assignment] + r1_atom: bytes = args.to(r).atom # type: ignore[assignment] + cost += (len(q1_atom) + len(r1_atom)) * MALLOC_COST_PER_BYTE return cost, args.to((q, r)) -def op_div(args): +def op_div(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = DIV_BASE_COST (i0, l0), (i1, l1) = args_as_int_list("/", args, 2) if i1 == 0: @@ -183,14 +190,14 @@ def op_div(args): return malloc_cost(cost, args.to(q)) -def op_gr(args): +def op_gr(args: SExp) -> typing.Tuple[int, SExp]: (i0, l0), (i1, l1) = args_as_int_list(">", args, 2) cost = GR_BASE_COST cost += (l0 + l1) * GR_COST_PER_BYTE return cost, args.true if i0 > i1 else args.false -def op_gr_bytes(args: SExp): +def op_gr_bytes(args: SExp) -> typing.Tuple[int, SExp]: arg_list = list(args.as_iter()) if len(arg_list) != 2: raise EvalError(">s takes exactly 2 arguments", args) @@ -200,11 +207,13 @@ def op_gr_bytes(args: SExp): b0 = a0.as_atom() b1 = a1.as_atom() cost = GRS_BASE_COST + if b0 is None or b1 is None: + raise TypeError("Internal error, both operands must not be None") cost += (len(b0) + len(b1)) * GRS_COST_PER_BYTE return cost, args.true if b0 > b1 else args.false -def op_pubkey_for_exp(args): +def op_pubkey_for_exp(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: ((i0, l0),) = args_as_int_list("pubkey_for_exp", args, 1) i0 %= 0x73EDA753299D7D483339D80809A1D80553BDA402FFFE5BFEFFFFFFFF00000001 exponent = PrivateKey.from_bytes(i0.to_bytes(32, "big")) @@ -217,7 +226,7 @@ def op_pubkey_for_exp(args): raise EvalError("problem in op_pubkey_for_exp: %s" % ex, args) -def op_point_add(items: SExp): +def op_point_add(items: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = POINT_ADD_BASE_COST p = G1Element() @@ -232,18 +241,19 @@ def op_point_add(items: SExp): return malloc_cost(cost, items.to(p)) -def op_strlen(args: SExp): +def op_strlen(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: if args.list_len() != 1: raise EvalError("strlen takes exactly 1 argument", args) a0 = args.first() if a0.pair: raise EvalError("strlen on list", a0) - size = len(a0.as_atom()) + assert a0.atom is not None + size = len(a0.atom) cost = STRLEN_BASE_COST + size * STRLEN_COST_PER_BYTE return malloc_cost(cost, args.to(size)) -def op_substr(args: SExp): +def op_substr(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: arg_count = args.list_len() if arg_count not in (2, 3): raise EvalError("substr takes exactly 2 or 3 arguments", args) @@ -252,6 +262,7 @@ def op_substr(args: SExp): raise EvalError("substr on list", a0) s0 = a0.as_atom() + assert s0 is not None if arg_count == 2: i1, = list(args_as_int32("substr", args.rest())) @@ -266,20 +277,21 @@ def op_substr(args: SExp): return cost, args.to(s) -def op_concat(args: SExp): +def op_concat(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: cost = CONCAT_BASE_COST s = io.BytesIO() for arg in args.as_iter(): if arg.pair: raise EvalError("concat on list", arg) - s.write(arg.as_atom()) + assert arg.atom is not None + s.write(arg.atom) cost += CONCAT_COST_PER_ARG r = s.getvalue() cost += len(r) * CONCAT_COST_PER_BYTE return malloc_cost(cost, args.to(r)) -def op_ash(args): +def op_ash(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0, l0), (i1, l1) = args_as_int_list("ash", args, 2) if l1 > 4: raise EvalError("ash requires int32 args (with no leading zeros)", args.rest().first()) @@ -294,7 +306,7 @@ def op_ash(args): return malloc_cost(cost, args.to(r)) -def op_lsh(args): +def op_lsh(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0, l0), (i1, l1) = args_as_int_list("lsh", args, 2) if l1 > 4: raise EvalError("lsh requires int32 args (with no leading zeros)", args.rest().first()) @@ -302,6 +314,7 @@ def op_lsh(args): raise EvalError("shift too large", args.to(i1)) # we actually want i0 to be an *unsigned* int a0 = args.first().as_atom() + assert a0 is not None i0 = int.from_bytes(a0, "big", signed=False) if i1 >= 0: r = i0 << i1 @@ -312,7 +325,12 @@ def op_lsh(args): return malloc_cost(cost, args.to(r)) -def binop_reduction(op_name, initial_value, args, op_f): +def binop_reduction( + op_name: str, + initial_value: int, + args: _T_SExp, + op_f: typing.Callable[[int, int], int], +) -> typing.Tuple[int, _T_SExp]: total = initial_value arg_size = 0 cost = LOG_BASE_COST @@ -324,37 +342,37 @@ def binop_reduction(op_name, initial_value, args, op_f): return malloc_cost(cost, args.to(total)) -def op_logand(args): - def binop(a, b): +def op_logand(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: + def binop(a: int, b: int) -> int: a &= b return a return binop_reduction("logand", -1, args, binop) -def op_logior(args): - def binop(a, b): +def op_logior(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: + def binop(a: int, b: int) -> int: a |= b return a return binop_reduction("logior", 0, args, binop) -def op_logxor(args): - def binop(a, b): +def op_logxor(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: + def binop(a: int, b: int) -> int: a ^= b return a return binop_reduction("logxor", 0, args, binop) -def op_lognot(args): +def op_lognot(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0, l0), = args_as_int_list("lognot", args, 1) cost = LOGNOT_BASE_COST + l0 * LOGNOT_COST_PER_BYTE return malloc_cost(cost, args.to(~i0)) -def op_not(args): +def op_not(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0,) = args_as_bool_list("not", args, 1) if i0.as_atom() == b"": r = args.true @@ -364,7 +382,7 @@ def op_not(args): return cost, args.to(r) -def op_any(args): +def op_any(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: items = list(args_as_bools("any", args)) cost = BOOL_BASE_COST + len(items) * BOOL_COST_PER_ARG r = args.false @@ -375,7 +393,7 @@ def op_any(args): return cost, args.to(r) -def op_all(args): +def op_all(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: items = list(args_as_bools("all", args)) cost = BOOL_BASE_COST + len(items) * BOOL_COST_PER_ARG r = args.true @@ -386,7 +404,7 @@ def op_all(args): return cost, args.to(r) -def op_softfork(args: SExp): +def op_softfork(args: SExp) -> typing.Tuple[int, SExp]: if args.list_len() < 1: raise EvalError("softfork takes at least 1 argument", args) a = args.first() diff --git a/clvm/op_utils.py b/clvm/op_utils.py index f91806b3..fd5f126f 100644 --- a/clvm/op_utils.py +++ b/clvm/op_utils.py @@ -1,4 +1,17 @@ -def operators_for_dict(keyword_to_atom, op_dict, op_name_lookup=None): +from __future__ import annotations + +import types +from typing import Dict, Optional, TYPE_CHECKING + +if TYPE_CHECKING: + from clvm.operators import OperatorProtocol + + +def operators_for_dict( + keyword_to_atom: Dict[str, bytes], + op_dict: Dict[str, OperatorProtocol], + op_name_lookup: Optional[Dict[str, str]] = None, +) -> Dict[bytes, OperatorProtocol]: if op_name_lookup is None: op_name_lookup = {} @@ -11,7 +24,11 @@ def operators_for_dict(keyword_to_atom, op_dict, op_name_lookup=None): return d -def operators_for_module(keyword_to_atom, mod, op_name_lookup=None): +def operators_for_module( + keyword_to_atom: Dict[str, bytes], + mod: types.ModuleType, + op_name_lookup: Optional[Dict[str, str]] = None, +) -> Dict[bytes, OperatorProtocol]: if op_name_lookup is None: op_name_lookup = {} return operators_for_dict(keyword_to_atom, mod.__dict__, op_name_lookup) diff --git a/clvm/operators.py b/clvm/operators.py index a63e6a88..4b2d834b 100644 --- a/clvm/operators.py +++ b/clvm/operators.py @@ -1,8 +1,9 @@ -from typing import Dict, Tuple +from __future__ import annotations + +from typing import Dict, Iterator, Optional, Protocol, Tuple, Type, TypeVar, overload from . import core_ops, more_ops -from .CLVMObject import CLVMObject from .SExp import SExp from .EvalError import EvalError @@ -65,11 +66,12 @@ } -def args_len(op_name, args): +def args_len(op_name: str, args: SExp) -> Iterator[int]: for arg in args.as_iter(): if arg.pair: raise EvalError("%s requires int args" % op_name, arg) - yield len(arg.as_atom()) + # not a pair so must be an atom + yield len(arg.atom) # type: ignore[arg-type] # unknown ops are reserved if they start with 0xffff @@ -99,7 +101,7 @@ def args_len(op_name, args): # this means that unknown ops where cost_function is 1, 2, or 3, may still be # fatal errors if the arguments passed are not atoms. -def default_unknown_op(op: bytes, args: CLVMObject) -> Tuple[int, CLVMObject]: +def default_unknown_op(op: bytes, args: SExp) -> Tuple[int, SExp]: # any opcode starting with ffff is reserved (i.e. fatal error) # opcodes are not allowed to be empty if len(op) == 0 or op[:2] == b"\xff\xff": @@ -155,6 +157,7 @@ def default_unknown_op(op: bytes, args: CLVMObject) -> Tuple[int, CLVMObject]: if arg.pair: raise EvalError("unknown op on list", arg) cost += CONCAT_COST_PER_ARG + assert arg.atom is not None length += len(arg.atom) cost += length * CONCAT_COST_PER_BYTE @@ -165,29 +168,81 @@ def default_unknown_op(op: bytes, args: CLVMObject) -> Tuple[int, CLVMObject]: return (cost, SExp.null()) -class OperatorDict(dict): +class OperatorProtocol(Protocol): + def __call__(self, args: SExp) -> Tuple[int, SExp]: + ... + + +class UnknownOperatorProtocol(Protocol): + def __call__(self, op: bytes, args: SExp) -> Tuple[int, SExp]: + ... + + +_T_OperatorDict = TypeVar("_T_OperatorDict", bound="OperatorDict") + + +class OperatorDict(Dict[bytes, OperatorProtocol]): """ This is a nice hack that adds `__call__` to a dictionary, so operators can be added dynamically. """ - def __new__(class_, d: Dict, *args, **kwargs): + unknown_op_handler: UnknownOperatorProtocol + quote_atom: bytes + apply_atom: bytes + + @overload + def __new__( + cls: Type[_T_OperatorDict], + d: Dict[bytes, OperatorProtocol], + quote: bytes, + apply: bytes, + unknown_op_handler: UnknownOperatorProtocol = default_unknown_op, + ) -> _T_OperatorDict: + ... + + @overload + def __new__( + cls: Type[_T_OperatorDict], + d: OperatorDict, + quote: Optional[bytes] = None, + apply: Optional[bytes] = None, + unknown_op_handler: UnknownOperatorProtocol = default_unknown_op, + ) -> _T_OperatorDict: + ... + + def __new__( + cls: Type[_T_OperatorDict], + d: Dict[bytes, OperatorProtocol], + quote: Optional[bytes] = None, + apply: Optional[bytes] = None, + unknown_op_handler: UnknownOperatorProtocol = default_unknown_op, + ) -> _T_OperatorDict: """ `quote_atom` and `apply_atom` must be set `unknown_op_handler` has a default implementation We do not check if quote and apply are distinct We do not check if the opcode values for quote and apply exist in the passed-in dict """ - self = super(OperatorDict, class_).__new__(class_, d) - self.quote_atom = kwargs["quote"] if "quote" in kwargs else d.quote_atom - self.apply_atom = kwargs["apply"] if "apply" in kwargs else d.apply_atom - if "unknown_op_handler" in kwargs: - self.unknown_op_handler = kwargs["unknown_op_handler"] + self = super().__new__(cls, d) + + if quote is None: + assert isinstance(d, OperatorDict) + self.quote_atom = d.quote_atom + else: + self.quote_atom = quote + + if apply is None: + assert isinstance(d, OperatorDict) + self.apply_atom = d.apply_atom else: - self.unknown_op_handler = default_unknown_op + self.apply_atom = apply + + self.unknown_op_handler = unknown_op_handler + return self - def __call__(self, op: bytes, arguments: CLVMObject) -> Tuple[int, CLVMObject]: + def __call__(self, op: bytes, arguments: SExp) -> Tuple[int, SExp]: f = self.get(op) if f is None: return self.unknown_op_handler(op, arguments) @@ -199,6 +254,8 @@ def __call__(self, op: bytes, arguments: CLVMObject) -> Tuple[int, CLVMObject]: APPLY_ATOM = KEYWORD_TO_ATOM["a"] OPERATOR_LOOKUP = OperatorDict( - operators_for_module(KEYWORD_TO_ATOM, core_ops, OP_REWRITE), quote=QUOTE_ATOM, apply=APPLY_ATOM + operators_for_module(KEYWORD_TO_ATOM, core_ops, OP_REWRITE), + quote=QUOTE_ATOM, + apply=APPLY_ATOM, ) OPERATOR_LOOKUP.update(operators_for_module(KEYWORD_TO_ATOM, more_ops, OP_REWRITE)) diff --git a/clvm/py.typed b/clvm/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/clvm/run_program.py b/clvm/run_program.py index 20f4b75c..85529add 100644 --- a/clvm/run_program.py +++ b/clvm/run_program.py @@ -1,8 +1,9 @@ -from typing import Any, Callable, List, Tuple +from typing import Callable, List, Optional, Tuple from .CLVMObject import CLVMObject from .EvalError import EvalError -from .SExp import SExp +from .SExp import CastableType, SExp +from .operators import OperatorDict from .costs import ( APPLY_COST, @@ -12,16 +13,18 @@ PATH_LOOKUP_COST_PER_ZERO_BYTE ) -# the "Any" below should really be "OpStackType" but -# recursive types aren't supported by mypy - -OpCallable = Callable[[Any, "ValStackType"], int] +OpCallable = Callable[["OpStackType", "ValStackType"], int] +PreOpCallable = Callable[["OpStackType", "ValStackType"], None] +PreEvalFunction = Callable[[SExp, SExp], Optional[Callable[[SExp], object]]] ValStackType = List[SExp] OpStackType = List[OpCallable] -def to_pre_eval_op(pre_eval_f, to_sexp_f): +def to_pre_eval_op( + pre_eval_f: PreEvalFunction, + to_sexp_f: Callable[[CastableType], SExp], +) -> PreOpCallable: def my_pre_eval_op(op_stack: OpStackType, value_stack: ValStackType) -> None: v = to_sexp_f(value_stack[-1]) context = pre_eval_f(v.first(), v.rest()) @@ -38,7 +41,7 @@ def invoke_context_op( return my_pre_eval_op -def msb_mask(byte): +def msb_mask(byte: int) -> int: byte |= byte >> 1 byte |= byte >> 2 byte |= byte >> 4 @@ -47,15 +50,15 @@ def msb_mask(byte): def run_program( program: CLVMObject, - args: CLVMObject, - operator_lookup: Callable[[bytes, CLVMObject], Tuple[int, CLVMObject]], - max_cost=None, - pre_eval_f=None, -) -> Tuple[int, CLVMObject]: - - program = SExp.to(program) - if pre_eval_f: - pre_eval_op = to_pre_eval_op(pre_eval_f, program.to) + args: SExp, + operator_lookup: OperatorDict, + max_cost: Optional[int] = None, + pre_eval_f: Optional[PreEvalFunction] = None, +) -> Tuple[int, SExp]: + + _program = SExp.to(program) + if pre_eval_f is not None: + pre_eval_op = to_pre_eval_op(pre_eval_f, _program.to) else: pre_eval_op = None @@ -65,6 +68,8 @@ def traverse_path(sexp: SExp, env: SExp) -> Tuple[int, SExp]: if sexp.nullp(): return cost, sexp.null() + if sexp.atom is None: + raise ValueError("Atom must have a non-None atom attribute") b = sexp.atom end_byte_cursor = 0 @@ -126,7 +131,9 @@ def eval_op(op_stack: OpStackType, value_stack: ValStackType) -> int: operator = sexp.first() if operator.pair: - new_operator, must_be_nil = operator.as_pair() + from_as_pair = operator.as_pair() + assert from_as_pair is not None + new_operator, must_be_nil = from_as_pair if new_operator.pair or must_be_nil.atom != b"": raise EvalError("in ((X)...) syntax X must be lone atom", sexp) new_operand_list = sexp.rest() @@ -160,6 +167,7 @@ def apply_op(op_stack: OpStackType, value_stack: ValStackType) -> int: raise EvalError("internal error", operator) op = operator.as_atom() + assert op is not None if op == operator_lookup.apply_atom: if operand_list.list_len() != 2: raise EvalError("apply requires exactly 2 parameters", operand_list) @@ -174,12 +182,12 @@ def apply_op(op_stack: OpStackType, value_stack: ValStackType) -> int: return additional_cost op_stack: OpStackType = [eval_op] - value_stack: ValStackType = [program.cons(args)] + value_stack: ValStackType = [_program.cons(args)] cost: int = 0 while op_stack: f = op_stack.pop() cost += f(op_stack, value_stack) if max_cost and cost > max_cost: - raise EvalError("cost exceeded", program.to(max_cost)) + raise EvalError("cost exceeded", _program.to(max_cost)) return cost, value_stack[-1] diff --git a/clvm/serialize.py b/clvm/serialize.py index cdc3cd30..ad7c3016 100644 --- a/clvm/serialize.py +++ b/clvm/serialize.py @@ -12,15 +12,37 @@ # If the first byte read is one of the following: # 1000 0000 -> 0 bytes : nil # 0000 0000 -> 1 byte : zero (b'\x00') +from __future__ import annotations + import io -from .CLVMObject import CLVMObject +import typing + +from .CLVMObject import CLVMObject, CLVMStorage + + +if typing.TYPE_CHECKING: + from .SExp import CastableType, SExp MAX_SINGLE_BYTE = 0x7F CONS_BOX_MARKER = 0xFF -def sexp_to_byte_iterator(sexp): +T = typing.TypeVar("T") + +ToCLVMStorage = typing.Callable[ + [typing.Union[bytes, typing.Tuple[CLVMStorage, CLVMStorage]]], CLVMStorage +] + +OpCallable = typing.Callable[ + ["OpStackType", "ValStackType", typing.BinaryIO, ToCLVMStorage], None +] + +ValStackType = typing.List[CLVMStorage] +OpStackType = typing.List[OpCallable] + + +def sexp_to_byte_iterator(sexp: CLVMStorage) -> typing.Iterator[bytes]: todo_stack = [sexp] while todo_stack: sexp = todo_stack.pop() @@ -30,10 +52,11 @@ def sexp_to_byte_iterator(sexp): todo_stack.append(pair[1]) todo_stack.append(pair[0]) else: + assert sexp.atom is not None yield from atom_to_byte_iterator(sexp.atom) -def atom_to_byte_iterator(as_atom): +def atom_to_byte_iterator(as_atom: bytes) -> typing.Iterator[bytes]: size = len(as_atom) if size == 0: yield b"\x80" @@ -68,18 +91,23 @@ def atom_to_byte_iterator(as_atom): ] ) else: - raise ValueError("sexp too long %s" % as_atom) + raise ValueError(f"sexp too long {as_atom!r}") yield size_blob yield as_atom -def sexp_to_stream(sexp, f): +def sexp_to_stream(sexp: SExp, f: typing.BinaryIO) -> None: for b in sexp_to_byte_iterator(sexp): f.write(b) -def _op_read_sexp(op_stack, val_stack, f, to_sexp): +def _op_read_sexp( + op_stack: OpStackType, + val_stack: ValStackType, + f: typing.BinaryIO, + to_sexp: ToCLVMStorage, +) -> None: blob = f.read(1) if len(blob) == 0: raise ValueError("bad encoding") @@ -92,15 +120,20 @@ def _op_read_sexp(op_stack, val_stack, f, to_sexp): val_stack.append(_atom_from_stream(f, b, to_sexp)) -def _op_cons(op_stack, val_stack, f, to_sexp): +def _op_cons( + op_stack: OpStackType, + val_stack: ValStackType, + f: typing.BinaryIO, + to_sexp: ToCLVMStorage, +) -> None: right = val_stack.pop() left = val_stack.pop() val_stack.append(to_sexp((left, right))) -def sexp_from_stream(f, to_sexp): - op_stack = [_op_read_sexp] - val_stack = [] +def sexp_from_stream(f: typing.BinaryIO, to_sexp: typing.Callable[["CastableType"], T]) -> T: + op_stack: OpStackType = [_op_read_sexp] + val_stack: ValStackType = [] while op_stack: func = op_stack.pop() @@ -108,7 +141,7 @@ def sexp_from_stream(f, to_sexp): return to_sexp(val_stack.pop()) -def _op_consume_sexp(f): +def _op_consume_sexp(f: typing.BinaryIO) -> typing.Tuple[bytes, int]: blob = f.read(1) if len(blob) == 0: raise ValueError("bad encoding") @@ -118,7 +151,7 @@ def _op_consume_sexp(f): return (_consume_atom(f, b), 0) -def _consume_atom(f, b): +def _consume_atom(f: typing.BinaryIO, b: int) -> bytes: if b == 0x80: return bytes([b]) if b <= MAX_SINGLE_BYTE: @@ -132,10 +165,10 @@ def _consume_atom(f, b): bit_mask >>= 1 size_blob = bytes([ll]) if bit_count > 1: - ll = f.read(bit_count - 1) - if len(ll) != bit_count - 1: + llb = f.read(bit_count - 1) + if len(llb) != bit_count - 1: raise ValueError("bad encoding") - size_blob += ll + size_blob += llb size = int.from_bytes(size_blob, "big") if size >= 0x400000000: raise ValueError("blob too large") @@ -148,7 +181,7 @@ def _consume_atom(f, b): # instead of parsing the input stream, this function pulls out all the bytes # that represent on S-expression tree, and returns them. This is more efficient # than parsing and returning a python S-expression tree. -def sexp_buffer_from_stream(f): +def sexp_buffer_from_stream(f: typing.BinaryIO) -> bytes: ret = io.BytesIO() depth = 1 @@ -160,7 +193,9 @@ def sexp_buffer_from_stream(f): return ret.getvalue() -def _atom_from_stream(f, b, to_sexp): +def _atom_from_stream( + f: typing.BinaryIO, b: int, to_sexp: ToCLVMStorage +) -> CLVMStorage: if b == 0x80: return to_sexp(b"") if b <= MAX_SINGLE_BYTE: @@ -173,10 +208,10 @@ def _atom_from_stream(f, b, to_sexp): bit_mask >>= 1 size_blob = bytes([b]) if bit_count > 1: - b = f.read(bit_count - 1) - if len(b) != bit_count - 1: + bb = f.read(bit_count - 1) + if len(bb) != bit_count - 1: raise ValueError("bad encoding") - size_blob += b + size_blob += bb size = int.from_bytes(size_blob, "big") if size >= 0x400000000: raise ValueError("blob too large") diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 00000000..a77ece31 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,20 @@ +[mypy] +files = clvm,tests,*.py +show_error_codes = True +warn_unused_ignores = True + +disallow_any_generics = True +disallow_subclassing_any = True +disallow_untyped_calls = True +disallow_untyped_defs = True +disallow_incomplete_defs = True +check_untyped_defs = True +disallow_untyped_decorators = True +no_implicit_optional = True +warn_return_any = True +no_implicit_reexport = True +strict_equality = True +warn_redundant_casts = True + +[mypy-blspy.*] +ignore_missing_imports = True diff --git a/setup.py b/setup.py index c95a3721..77587546 100755 --- a/setup.py +++ b/setup.py @@ -5,9 +5,19 @@ with open("README.md", "rt") as fh: long_description = fh.read() -dependencies = ["chia_rs>=0.2.13", "importlib_metadata~=6.11.0"] +dependencies = [ + "chia_rs>=0.2.13", + "importlib_metadata~=6.11.0", + "typing-extensions~=4.0", +] -dev_dependencies = ["clvm_tools>=0.4.4", "pytest", "setuptools"] +dev_dependencies = [ + "clvm_tools>=0.4.4", + "mypy", + "pytest", + "setuptools", + "types-setuptools", +] setup( name="clvm", @@ -40,4 +50,7 @@ "Bug Reports": "https://github.com/Chia-Network/clvm", "Source": "https://github.com/Chia-Network/clvm", }, + package_data={ + "": ["py.typed"], + }, ) diff --git a/tests/as_python_test.py b/tests/as_python_test.py index 6d8de7b7..5c5c0d3d 100644 --- a/tests/as_python_test.py +++ b/tests/as_python_test.py @@ -1,13 +1,15 @@ import unittest -from clvm import SExp +from typing import List, Tuple, Union + from clvm.CLVMObject import CLVMObject +from clvm.SExp import CastableType, SExp from chia_rs import G1Element from clvm.EvalError import EvalError class dummy_class: - def __init__(self): + def __init__(self) -> None: self.i = 0 @@ -22,51 +24,57 @@ def gen_tree(depth: int) -> SExp: H01 = fh("01") H02 = fh("02") +NestedListOfBytes = Union[bytes, List["NestedListOfBytes"]] +NestedTupleOfBytes = Union[bytes, Tuple["NestedTupleOfBytes", "NestedTupleOfBytes"]] + class AsPythonTest(unittest.TestCase): - def check_as_python(self, p): + def check_as_python(self, p: CastableType) -> None: v = SExp.to(p) p1 = v.as_python() self.assertEqual(p, p1) - def test_null(self): + def test_null(self) -> None: self.check_as_python(b"") - def test_embedded_tuples(self): + def test_embedded_tuples(self) -> None: self.check_as_python((b"10", ((b"200", b"300"), b"400"))) - def test_single_bytes(self): + def test_single_bytes(self) -> None: for _ in range(256): self.check_as_python(bytes([_])) - def test_short_lists(self): + def test_short_lists(self) -> None: self.check_as_python(b"") for _ in range(256): for size in range(1, 5): self.check_as_python(bytes([_] * size)) - def test_int(self): + def test_int(self) -> None: v = SExp.to(42) self.assertEqual(v.atom, bytes([42])) - def test_none(self): + def test_none(self) -> None: v = SExp.to(None) self.assertEqual(v.atom, b"") - def test_empty_list(self): + def test_empty_list(self) -> None: v = SExp.to([]) self.assertEqual(v.atom, b"") - def test_list_of_one(self): + def test_list_of_one(self) -> None: v = SExp.to([1]) + assert v.pair is not None self.assertEqual(type(v.pair[0]), CLVMObject) self.assertEqual(type(v.pair[1]), CLVMObject) - self.assertEqual(type(v.as_pair()[0]), SExp) - self.assertEqual(type(v.as_pair()[1]), SExp) + from_as_pair = v.as_pair() + assert from_as_pair is not None + self.assertEqual(type(from_as_pair[0]), SExp) + self.assertEqual(type(from_as_pair[1]), SExp) self.assertEqual(v.pair[0].atom, b"\x01") self.assertEqual(v.pair[1].atom, b"") - def test_g1element(self): + def test_g1element(self) -> None: b = fh( "b3b8ac537f4fd6bde9b26221d49b54b17a506be147347dae5" "d081c0a6572b611d8484e338f3432971a9823976c6a232b" @@ -74,7 +82,7 @@ def test_g1element(self): v = SExp.to(G1Element.from_bytes(b)) self.assertEqual(v.atom, b) - def test_complex(self): + def test_complex(self) -> None: self.check_as_python((b"", b"foo")) self.check_as_python((b"", b"1")) self.check_as_python([b"2", (b"", b"1")]) @@ -83,7 +91,7 @@ def test_complex(self): [b"", b"1", b"2", [b"30", b"40", b"90"], b"600", (b"", b"18")] ) - def test_listp(self): + def test_listp(self) -> None: self.assertEqual(SExp.to(42).listp(), False) self.assertEqual(SExp.to(b"").listp(), False) self.assertEqual(SExp.to(b"1337").listp(), False) @@ -91,35 +99,35 @@ def test_listp(self): self.assertEqual(SExp.to((1337, 42)).listp(), True) self.assertEqual(SExp.to([1337, 42]).listp(), True) - def test_nullp(self): + def test_nullp(self) -> None: self.assertEqual(SExp.to(b"").nullp(), True) self.assertEqual(SExp.to(b"1337").nullp(), False) self.assertEqual(SExp.to((b"", b"")).nullp(), False) - def test_constants(self): + def test_constants(self) -> None: self.assertEqual(SExp.__null__.nullp(), True) self.assertEqual(SExp.null().nullp(), True) self.assertEqual(SExp.true, True) self.assertEqual(SExp.false, False) - def test_list_len(self): + def test_list_len(self) -> None: v = SExp.to(42) for i in range(100): self.assertEqual(v.list_len(), i) v = SExp.to((42, v)) self.assertEqual(v.list_len(), 100) - def test_list_len_atom(self): + def test_list_len_atom(self) -> None: v = SExp.to(42) self.assertEqual(v.list_len(), 0) - def test_as_int(self): + def test_as_int(self) -> None: self.assertEqual(SExp.to(fh("80")).as_int(), -128) self.assertEqual(SExp.to(fh("ff")).as_int(), -1) self.assertEqual(SExp.to(fh("0080")).as_int(), 128) self.assertEqual(SExp.to(fh("00ff")).as_int(), 255) - def test_cons(self): + def test_cons(self) -> None: # list self.assertEqual( SExp.to(H01).cons(SExp.to(H02).cons(SExp.null())).as_python(), @@ -128,73 +136,84 @@ def test_cons(self): # cons-box of two values self.assertEqual(SExp.to(H01).cons(SExp.to(H02).as_python()), (H01, H02)) - def test_string(self): + def test_string(self) -> None: self.assertEqual(SExp.to("foobar").as_atom(), b"foobar") - def test_deep_recursion(self): - d = b"2" + def test_deep_recursion(self) -> None: + d: NestedListOfBytes = b"2" for i in range(1000): d = [d] v = SExp.to(d) for i in range(1000): - self.assertEqual(v.as_pair()[1].as_atom(), SExp.null()) - v = v.as_pair()[0] - d = d[0] + from_as_pair = v.as_pair() + assert from_as_pair is not None + self.assertEqual(from_as_pair[1].as_atom(), SExp.null()) + v = from_as_pair[0] + element = d[0] + assert isinstance(element, (list, bytes)) + d = element self.assertEqual(v.as_atom(), b"2") self.assertEqual(d, b"2") - def test_long_linked_list(self): - d = b"" + def test_long_linked_list(self) -> None: + d: NestedTupleOfBytes = b"" for i in range(1000): d = (b"2", d) v = SExp.to(d) for i in range(1000): - self.assertEqual(v.as_pair()[0].as_atom(), d[0]) - v = v.as_pair()[1] + from_as_pair = v.as_pair() + assert from_as_pair is not None + self.assertEqual(from_as_pair[0].as_atom(), d[0]) + from_as_pair = v.as_pair() + assert from_as_pair is not None + v = from_as_pair[1] + assert isinstance(d, tuple) d = d[1] self.assertEqual(v.as_atom(), SExp.null()) self.assertEqual(d, b"") - def test_long_list(self): + def test_long_list(self) -> None: d = [1337] * 1000 v = SExp.to(d) for i in range(1000 - 1): - self.assertEqual(v.as_pair()[0].as_int(), d[i]) - v = v.as_pair()[1] + from_as_pair = v.as_pair() + assert from_as_pair is not None + self.assertEqual(from_as_pair[0].as_int(), d[i]) + v = from_as_pair[1] self.assertEqual(v.as_atom(), SExp.null()) - def test_invalid_type(self): + def test_invalid_type(self) -> None: with self.assertRaises(ValueError): SExp.to(dummy_class) - def test_invalid_tuple(self): + def test_invalid_tuple(self) -> None: with self.assertRaises(ValueError): SExp.to((dummy_class, dummy_class)) with self.assertRaises(ValueError): SExp.to((dummy_class, dummy_class, dummy_class)) - def test_clvm_object_tuple(self): + def test_clvm_object_tuple(self) -> None: o1 = CLVMObject(b"foo") o2 = CLVMObject(b"bar") self.assertEqual(SExp.to((o1, o2)), (o1, o2)) - def test_first(self): + def test_first(self) -> None: val = SExp.to(1) self.assertRaises(EvalError, lambda: val.first()) val = SExp.to((42, val)) self.assertEqual(val.first(), SExp.to(42)) - def test_rest(self): + def test_rest(self) -> None: val = SExp.to(1) self.assertRaises(EvalError, lambda: val.rest()) val = SExp.to((42, val)) self.assertEqual(val.rest(), SExp.to(1)) - def test_as_iter(self): + def test_as_iter(self) -> None: val = list(SExp.to((1, (2, (3, (4, b""))))).as_iter()) self.assertEqual(val, [1, 2, 3, 4]) @@ -210,7 +229,7 @@ def test_as_iter(self): EvalError, lambda: list(SExp.to((1, (2, (3, (4, 5))))).as_iter()) ) - def test_eq(self): + def test_eq(self) -> None: val = SExp.to(1) self.assertTrue(val == 1) @@ -222,7 +241,7 @@ def test_eq(self): self.assertFalse(val == (1, 2)) self.assertFalse(val == (dummy_class, dummy_class)) - def test_eq_tree(self): + def test_eq_tree(self) -> None: val1 = gen_tree(2) val2 = gen_tree(2) val3 = gen_tree(3) @@ -232,14 +251,14 @@ def test_eq_tree(self): self.assertFalse(val1 == val3) self.assertFalse(val3 == val1) - def test_str(self): + def test_str(self) -> None: self.assertEqual(str(SExp.to(1)), "01") self.assertEqual(str(SExp.to(1337)), "820539") self.assertEqual(str(SExp.to(-1)), "81ff") self.assertEqual(str(gen_tree(1)), "ff820539820539") self.assertEqual(str(gen_tree(2)), "ffff820539820539ff820539820539") - def test_repr(self): + def test_repr(self) -> None: self.assertEqual(repr(SExp.to(1)), "SExp(01)") self.assertEqual(repr(SExp.to(1337)), "SExp(820539)") self.assertEqual(repr(SExp.to(-1)), "SExp(81ff)") diff --git a/tests/bls12_381_test.py b/tests/bls12_381_test.py index f72e3117..114c38d3 100644 --- a/tests/bls12_381_test.py +++ b/tests/bls12_381_test.py @@ -6,7 +6,7 @@ class BLS12_381_Test(unittest.TestCase): - def test_stream(self): + def test_stream(self) -> None: for _ in range(1, 64): p = PrivateKey.from_bytes(_.to_bytes(32, "big")).get_g1() blob = bytes(p) diff --git a/tests/cmds_test.py b/tests/cmds_test.py index 23497a9c..b60d4462 100644 --- a/tests/cmds_test.py +++ b/tests/cmds_test.py @@ -3,6 +3,7 @@ import shlex import sys import unittest +from typing import Callable, Iterable, List, Optional, Tuple import importlib_metadata @@ -14,7 +15,7 @@ REPAIR = os.getenv("REPAIR", 0) -def get_test_cases(path): +def get_test_cases(path: str) -> List[Tuple[str, List[str], str, List[str], str]]: PREFIX = os.path.dirname(__file__) TESTS_PATH = os.path.join(PREFIX, path) paths = [] @@ -45,7 +46,7 @@ def get_test_cases(path): class TestCmds(unittest.TestCase): - def invoke_tool(self, cmd_line): + def invoke_tool(self, cmd_line: str) -> Tuple[Optional[int], str, str]: # capture io stdout_buffer = io.StringIO() @@ -59,7 +60,7 @@ def invoke_tool(self, cmd_line): args = shlex.split(cmd_line) [entry_point] = importlib_metadata.entry_points(group="console_scripts", name=args[0]) - v = entry_point.load()(args) + v: Optional[int] = entry_point.load()(args) sys.stdout = old_stdout sys.stderr = old_stderr @@ -67,8 +68,13 @@ def invoke_tool(self, cmd_line): return v, stdout_buffer.getvalue(), stderr_buffer.getvalue() -def make_f(cmd_lines, expected_output, comments, path): - def f(self): +def make_f( + cmd_lines: List[str], + expected_output: object, + comments: Iterable[str], + path: str, +) -> Callable[[TestCmds], None]: + def f(self: TestCmds) -> None: cmd = "".join(cmd_lines) for c in cmd.split(";"): r, actual_output, actual_stderr = self.invoke_tool(c) @@ -92,7 +98,7 @@ def f(self): return f -def inject(*paths): +def inject(*paths: str) -> None: for path in paths: for idx, (name, i, o, comments, path) in enumerate(get_test_cases(path)): name_of_f = "test_%s" % name @@ -104,7 +110,7 @@ def inject(*paths): inject("unknown-op") -def main(): +def main() -> None: unittest.main() diff --git a/tests/operatordict_test.py b/tests/operatordict_test.py index 0a6e8e87..13594fd4 100644 --- a/tests/operatordict_test.py +++ b/tests/operatordict_test.py @@ -1,29 +1,32 @@ import unittest -from clvm.operators import OperatorDict +from typing import Dict + +from clvm.operators import OperatorProtocol, OperatorDict class OperatorDictTest(unittest.TestCase): - def test_operatordict_constructor(self): + def test_operatordict_constructor(self) -> None: """Constructing should fail if quote or apply are not specified, either by object property or by keyword argument. Note that they cannot be specified in the operator dictionary itself. """ - d = {1: "hello", 2: "goodbye"} - with self.assertRaises(AttributeError): - OperatorDict(d) - with self.assertRaises(AttributeError): - OperatorDict(d, apply=1) - with self.assertRaises(AttributeError): - OperatorDict(d, quote=1) - o = OperatorDict(d, apply=1, quote=2) + # ignoring because apparently it doesn't matter for this test that the types are all wrong + d: Dict[bytes, OperatorProtocol] = {b"\01": "hello", b"\02": "goodbye"} # type: ignore [dict-item] + with self.assertRaises(AssertionError): + OperatorDict(d) # type: ignore[call-overload] + with self.assertRaises(AssertionError): + OperatorDict(d, apply=b"\01") # type: ignore[call-overload] + with self.assertRaises(AssertionError): + OperatorDict(d, quote=b"\01") # type: ignore[call-overload] + o = OperatorDict(d, apply=b"\01", quote=b"\02") print(o) # Why does the constructed Operator dict contain entries for "apply":1 and "quote":2 ? # assert d == o - self.assertEqual(o.apply_atom, 1) - self.assertEqual(o.quote_atom, 2) + self.assertEqual(o.apply_atom, b"\01") + self.assertEqual(o.quote_atom, b"\02") # Test construction from an already existing OperatorDict o2 = OperatorDict(o) - self.assertEqual(o2.apply_atom, 1) - self.assertEqual(o2.quote_atom, 2) + self.assertEqual(o2.apply_atom, b"\01") + self.assertEqual(o2.quote_atom, b"\02") diff --git a/tests/operators_test.py b/tests/operators_test.py index 37707325..5ac7c830 100644 --- a/tests/operators_test.py +++ b/tests/operators_test.py @@ -1,23 +1,24 @@ import unittest +from typing import Tuple from clvm.operators import (OPERATOR_LOOKUP, KEYWORD_TO_ATOM, default_unknown_op, OperatorDict) from clvm.EvalError import EvalError -from clvm import SExp +from clvm.SExp import SExp from clvm.costs import CONCAT_BASE_COST class OperatorsTest(unittest.TestCase): - def setUp(self): + def setUp(self) -> None: self.handler_called = False - def unknown_handler(self, name, args): + def unknown_handler(self, name: bytes, args: SExp) -> Tuple[int, SExp]: self.handler_called = True self.assertEqual(name, b'\xff\xff1337') self.assertEqual(args, SExp.to(1337)) return 42, SExp.to(b'foobar') - def test_unknown_op(self): + def test_unknown_op(self) -> None: self.assertRaises(EvalError, lambda: OPERATOR_LOOKUP(b'\xff\xff1337', SExp.to(1337))) od = OperatorDict(OPERATOR_LOOKUP, unknown_op_handler=lambda name, args: self.unknown_handler(name, args)) cost, ret = od(b'\xff\xff1337', SExp.to(1337)) @@ -25,11 +26,11 @@ def test_unknown_op(self): self.assertEqual(cost, 42) self.assertEqual(ret, SExp.to(b'foobar')) - def test_plus(self): + def test_plus(self) -> None: print(OPERATOR_LOOKUP) self.assertEqual(OPERATOR_LOOKUP(KEYWORD_TO_ATOM['+'], SExp.to([3, 4, 5]))[1], SExp.to(12)) - def test_unknown_op_reserved(self): + def test_unknown_op_reserved(self) -> None: # any op that starts with ffff is reserved, and results in a hard # failure @@ -51,7 +52,7 @@ def test_unknown_op_reserved(self): # the cost is 0xffff00 = 16776960 self.assertEqual(default_unknown_op(b"\x00\xff\xff\x00\x00", SExp.null()), (16776961, SExp.null())) - def test_unknown_ops_last_bits(self): + def test_unknown_ops_last_bits(self) -> None: # The last byte is ignored for no-op unknown ops for suffix in [b"\x3f", b"\x0f", b"\x00", b"\x2c"]: diff --git a/tests/run_program_test.py b/tests/run_program_test.py index d64462ca..78ffb998 100644 --- a/tests/run_program_test.py +++ b/tests/run_program_test.py @@ -5,7 +5,7 @@ class BitTest(unittest.TestCase): - def test_msb_mask(self): + def test_msb_mask(self) -> None: self.assertEqual(msb_mask(0x0), 0x0) self.assertEqual(msb_mask(0x01), 0x01) self.assertEqual(msb_mask(0x02), 0x02) diff --git a/tests/serialize_test.py b/tests/serialize_test.py index 786f1c95..b84deb1c 100644 --- a/tests/serialize_test.py +++ b/tests/serialize_test.py @@ -1,39 +1,38 @@ import io import unittest +from typing import Optional from clvm import to_sexp_f +from clvm.SExp import CastableType from clvm.serialize import (sexp_from_stream, sexp_buffer_from_stream, atom_to_byte_iterator) TEXT = b"the quick brown fox jumps over the lazy dogs" -class InfiniteStream(io.TextIOBase): - def __init__(self, b): - self.buf = b +class InfiniteStream(io.BytesIO): + def read(self, n: Optional[int] = -1) -> bytes: + result = super().read(n) - def read(self, n): - ret = b'' - while n > 0 and len(self.buf) > 0: - ret += self.buf[0:1] - self.buf = self.buf[1:] - n -= 1 - ret += b' ' * n - return ret + if n is not None and n > 0: + fill_needed = n - len(result) + result += b' ' * fill_needed + + return result class LargeAtom: - def __len__(self): + def __len__(self) -> int: return 0x400000001 class SerializeTest(unittest.TestCase): - def check_serde(self, s): + def check_serde(self, s: CastableType) -> None: v = to_sexp_f(s) b = v.as_bin() v1 = sexp_from_stream(io.BytesIO(b), to_sexp_f) if v != v1: - print("%s: %d %s %s" % (v, len(b), b, v1)) + print("%s: %d %r %s" % (v, len(b), b, v1)) breakpoint() b = v.as_bin() v1 = sexp_from_stream(io.BytesIO(b), to_sexp_f) @@ -44,44 +43,49 @@ def check_serde(self, s): buf = sexp_buffer_from_stream(io.BytesIO(b)) self.assertEqual(buf, b) - def test_zero(self): + def test_zero(self) -> None: v = to_sexp_f(b"\x00") self.assertEqual(v.as_bin(), b"\x00") - def test_empty(self): + def test_empty(self) -> None: v = to_sexp_f(b"") self.assertEqual(v.as_bin(), b"\x80") - def test_empty_string(self): + def test_empty_string(self) -> None: self.check_serde(b"") - def test_single_bytes(self): + def test_single_bytes(self) -> None: for _ in range(256): self.check_serde(bytes([_])) - def test_short_lists(self): + def test_short_lists(self) -> None: self.check_serde([]) for _ in range(0, 2048, 8): for size in range(1, 5): self.check_serde([_] * size) - def test_cons_box(self): + def test_cons_box(self) -> None: self.check_serde((None, None)) - self.check_serde((None, [1, 2, 30, 40, 600, (None, 18)])) + a: CastableType = (None, 18) + b: CastableType = [1, 2, 30, 40, 600, a] + c: CastableType = (None, b) + self.check_serde(c) self.check_serde((100, (TEXT, (30, (50, (90, (TEXT, TEXT + TEXT))))))) - def test_long_blobs(self): + def test_long_blobs(self) -> None: text = TEXT * 300 for _, t in enumerate(text): t1 = text[:_] self.check_serde(t1) - def test_blob_limit(self): + def test_blob_limit(self) -> None: with self.assertRaises(ValueError): - for b in atom_to_byte_iterator(LargeAtom()): - print('%02x' % b) + # Specifically substituting another type that is sufficiently similar to + # the expected bytes for this test. + for _ in atom_to_byte_iterator(LargeAtom()): # type: ignore[arg-type] + pass - def test_very_long_blobs(self): + def test_very_long_blobs(self) -> None: for size in [0x40, 0x2000, 0x100000, 0x8000000]: count = size // len(TEXT) text = TEXT * count @@ -91,7 +95,7 @@ def test_very_long_blobs(self): assert len(text) > size self.check_serde(text) - def test_very_deep_tree(self): + def test_very_deep_tree(self) -> None: blob = b"a" for depth in [10, 100, 1000, 10000, 100000]: s = to_sexp_f(blob) @@ -99,7 +103,7 @@ def test_very_deep_tree(self): s = to_sexp_f((s, blob)) self.check_serde(s) - def test_deserialize_empty(self): + def test_deserialize_empty(self) -> None: bytes_in = b'' with self.assertRaises(ValueError): sexp_from_stream(io.BytesIO(bytes_in), to_sexp_f) @@ -107,7 +111,7 @@ def test_deserialize_empty(self): with self.assertRaises(ValueError): sexp_buffer_from_stream(io.BytesIO(bytes_in)) - def test_deserialize_truncated_size(self): + def test_deserialize_truncated_size(self) -> None: # fe means the total number of bytes in the length-prefix is 7 # one for each bit set. 5 bytes is too few bytes_in = b'\xfe ' @@ -117,7 +121,7 @@ def test_deserialize_truncated_size(self): with self.assertRaises(ValueError): sexp_buffer_from_stream(io.BytesIO(bytes_in)) - def test_deserialize_truncated_blob(self): + def test_deserialize_truncated_blob(self) -> None: # this is a complete length prefix. The blob is supposed to be 63 bytes # the blob itself is truncated though, it's less than 63 bytes bytes_in = b'\xbf ' @@ -128,7 +132,7 @@ def test_deserialize_truncated_blob(self): with self.assertRaises(ValueError): sexp_buffer_from_stream(io.BytesIO(bytes_in)) - def test_deserialize_large_blob(self): + def test_deserialize_large_blob(self) -> None: # this length prefix is 7 bytes long, the last 6 bytes specifies the # length of the blob, which is 0xffffffffffff, or (2^48 - 1) # we don't support blobs this large, and we should fail immediately when diff --git a/tests/to_sexp_test.py b/tests/to_sexp_test.py index 2e5ee74c..2d1e646d 100644 --- a/tests/to_sexp_test.py +++ b/tests/to_sexp_test.py @@ -1,11 +1,12 @@ import unittest +from dataclasses import dataclass -from typing import Optional, Tuple, Any +from typing import ClassVar, Optional, TYPE_CHECKING, Tuple, cast from clvm.SExp import SExp, looks_like_clvm_object, convert_atom_to_bytes -from clvm.CLVMObject import CLVMObject +from clvm.CLVMObject import CLVMObject, CLVMStorage -def validate_sexp(sexp): +def validate_sexp(sexp: SExp) -> None: validate_stack = [sexp] while validate_stack: v = validate_stack.pop() @@ -15,7 +16,9 @@ def validate_sexp(sexp): v1, v2 = v.pair assert looks_like_clvm_object(v1) assert looks_like_clvm_object(v2) - s1, s2 = v.as_pair() + from_as_pair = v.as_pair() + assert from_as_pair is not None + s1, s2 = from_as_pair validate_stack.append(s1) validate_stack.append(s2) else: @@ -30,7 +33,9 @@ def print_leaves(tree: SExp) -> str: return "%d " % a[0] ret = "" - for i in tree.as_pair(): + from_as_pair = tree.as_pair() + assert from_as_pair is not None + for i in from_as_pair: ret += print_leaves(i) return ret @@ -44,43 +49,65 @@ def print_tree(tree: SExp) -> str: return "%d " % a[0] ret = "(" - for i in tree.as_pair(): + from_as_pair = tree.as_pair() + assert from_as_pair is not None + for i in from_as_pair: ret += print_tree(i) ret += ")" return ret +@dataclass(frozen=True) +class PairAndAtom: + pair: None = None + atom: None = None + + +@dataclass(frozen=True) +class Pair: + pair: None = None + + +@dataclass(frozen=True) +class Atom: + atom: None = None + + class DummyByteConvertible: def __bytes__(self) -> bytes: return b"foobar" class ToSExpTest(unittest.TestCase): - def test_cast_1(self): + def test_cast_1(self) -> None: # this was a problem in `clvm_tools` and is included # to prevent regressions sexp = SExp.to(b"foo") t1 = sexp.to([1, sexp]) validate_sexp(t1) - def test_wrap_sexp(self): + def test_wrap_sexp(self) -> None: # it's a bit of a layer violation that CLVMObject unwraps SExp, but we # rely on that in a fair number of places for now. We should probably # work towards phasing that out - o = CLVMObject(SExp.to(1)) + + # making sure this works despite hinting against it at CLVMObject + o = CLVMObject(SExp.to(1)) # type: ignore[arg-type] assert o.atom == bytes([1]) - def test_arbitrary_underlying_tree(self): + def test_arbitrary_underlying_tree(self) -> None: # SExp provides a view on top of a tree of arbitrary types, as long as # those types implement the CLVMObject protocol. This is an example of # a tree that's generated class GeneratedTree: + if TYPE_CHECKING: + _type_check: ClassVar[CLVMStorage] = cast("GeneratedTree", None) depth: int = 4 val: int = 0 - def __init__(self, depth, val): + def __init__(self, depth: int, val: int) -> None: assert depth >= 0 self.depth = depth self.val = val @@ -91,13 +118,21 @@ def atom(self) -> Optional[bytes]: return None return bytes([self.val]) + @atom.setter + def atom(self, val: Optional[bytes]) -> None: + raise RuntimeError("setting not supported in this test class") + @property - def pair(self) -> Optional[Tuple[Any, Any]]: + def pair(self) -> Optional[Tuple[CLVMStorage, CLVMStorage]]: if self.depth == 0: return None new_depth: int = self.depth - 1 return (GeneratedTree(new_depth, self.val), GeneratedTree(new_depth, self.val + 2**new_depth)) + @pair.setter + def pair(self, val: Optional[Tuple[CLVMStorage, CLVMStorage]]) -> None: + raise RuntimeError("setting not supported in this test class") + tree = SExp.to(GeneratedTree(5, 0)) assert print_leaves(tree) == "0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 " + \ "16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 " @@ -108,52 +143,45 @@ def pair(self) -> Optional[Tuple[Any, Any]]: tree = SExp.to(GeneratedTree(3, 10)) assert print_leaves(tree) == "10 11 12 13 14 15 16 17 " - def test_looks_like_clvm_object(self): + def test_looks_like_clvm_object(self) -> None: # this function can't look at the values, that would cause a cascade of # eager evaluation/conversion - class dummy: - pass - - obj = dummy() - obj.pair = None - obj.atom = None - print(dir(obj)) - assert looks_like_clvm_object(obj) + pair_and_atom = PairAndAtom() + print(dir(pair_and_atom)) + assert looks_like_clvm_object(pair_and_atom) - obj = dummy() - obj.pair = None - assert not looks_like_clvm_object(obj) + pair = Pair() + assert not looks_like_clvm_object(pair) - obj = dummy() - obj.atom = None - assert not looks_like_clvm_object(obj) + atom = Atom() + assert not looks_like_clvm_object(atom) - def test_list_conversions(self): + def test_list_conversions(self) -> None: a = SExp.to([1, 2, 3]) assert print_tree(a) == "(1 (2 (3 () )))" - def test_string_conversions(self): + def test_string_conversions(self) -> None: a = SExp.to("foobar") assert a.as_atom() == "foobar".encode() - def test_int_conversions(self): + def test_int_conversions(self) -> None: a = SExp.to(1337) assert a.as_atom() == bytes([0x5, 0x39]) - def test_none_conversions(self): + def test_none_conversions(self) -> None: a = SExp.to(None) assert a.as_atom() == b"" - def test_empty_list_conversions(self): + def test_empty_list_conversions(self) -> None: a = SExp.to([]) assert a.as_atom() == b"" - def test_eager_conversion(self): + def test_eager_conversion(self) -> None: with self.assertRaises(ValueError): SExp.to(("foobar", (1, {}))) - def test_convert_atom(self): + def test_convert_atom(self) -> None: assert convert_atom_to_bytes(0x133742) == bytes([0x13, 0x37, 0x42]) assert convert_atom_to_bytes(0x833742) == bytes([0x00, 0x83, 0x37, 0x42]) assert convert_atom_to_bytes(0) == b"" @@ -168,10 +196,10 @@ def test_convert_atom(self): assert convert_atom_to_bytes(DummyByteConvertible()) == b"foobar" with self.assertRaises(ValueError): - convert_atom_to_bytes([1, 2, 3]) + convert_atom_to_bytes([1, 2, 3]) # type: ignore[arg-type] with self.assertRaises(ValueError): - convert_atom_to_bytes((1, 2)) + convert_atom_to_bytes((1, 2)) # type: ignore[arg-type] with self.assertRaises(ValueError): - convert_atom_to_bytes({}) + convert_atom_to_bytes({}) # type: ignore[arg-type]