diff --git a/clvm/CLVMObject.py b/clvm/CLVMObject.py index 04cffdf0..f23b80a1 100644 --- a/clvm/CLVMObject.py +++ b/clvm/CLVMObject.py @@ -43,7 +43,10 @@ def __new__( self = super(CLVMObject, class_).__new__(class_) if isinstance(narrowed_v, tuple): if len(narrowed_v) != 2: - raise ValueError("tuples must be of size 2, cannot create CLVMObject from: %s" % str(narrowed_v)) + raise ValueError( + "tuples must be of size 2, cannot create CLVMObject from: %s" + % str(narrowed_v) + ) self.pair = narrowed_v self.atom = None else: diff --git a/clvm/SExp.py b/clvm/SExp.py index 2f197c7a..8efa2ae8 100644 --- a/clvm/SExp.py +++ b/clvm/SExp.py @@ -40,9 +40,15 @@ def looks_like_clvm_object(o: typing.Any) -> typing_extensions.TypeGuard[CLVMSto # this function recognizes some common types and turns them into plain bytes, def convert_atom_to_bytes( - v: typing.Union[bytes, str, int, None, typing.List[typing_extensions.Never], typing.SupportsBytes], + v: typing.Union[ + bytes, + str, + int, + None, + typing.List[typing_extensions.Never], + typing.SupportsBytes, + ], ) -> bytes: - if isinstance(v, bytes): return v if isinstance(v, str): @@ -141,6 +147,7 @@ class SExp: elements implementing the CLVM object protocol. Exactly one of "atom" and "pair" must be None. """ + true: typing.ClassVar[SExp] false: typing.ClassVar[SExp] __null__: typing.ClassVar[SExp] diff --git a/clvm/as_python.py b/clvm/as_python.py index 41ecf9a9..2e0ff2d2 100644 --- a/clvm/as_python.py +++ b/clvm/as_python.py @@ -7,7 +7,9 @@ OpCallable = Callable[["OpStackType", "ValStackType"], None] -PythonReturnType = Union[bytes, Tuple["PythonReturnType", "PythonReturnType"], List["PythonReturnType"]] +PythonReturnType = Union[ + bytes, Tuple["PythonReturnType", "PythonReturnType"], List["PythonReturnType"] +] ValType = Union["SExp", PythonReturnType] ValStackType = List[ValType] @@ -22,7 +24,9 @@ def _roll(op_stack: OpStackType, val_stack: ValStackType) -> None: val_stack.append(v2) -MakeTupleValStackType = List[Union[bytes, Tuple[object, object], "MakeTupleValStackType"]] +MakeTupleValStackType = List[ + Union[bytes, Tuple[object, object], "MakeTupleValStackType"] +] def _make_tuple(op_stack: OpStackType, val_stack: ValStackType) -> None: diff --git a/clvm/more_ops.py b/clvm/more_ops.py index ecabb05b..fd68d07b 100644 --- a/clvm/more_ops.py +++ b/clvm/more_ops.py @@ -87,15 +87,21 @@ def args_as_int32(op_name: str, args: SExp) -> typing.Iterator[int]: if arg.atom is None: raise EvalError("%s requires int32 args" % op_name, arg) if len(arg.atom) > 4: - raise EvalError("%s requires int32 args (with no leading zeros)" % op_name, arg) + raise EvalError( + "%s requires int32 args (with no leading zeros)" % op_name, arg + ) yield arg.as_int() -def args_as_int_list(op_name: str, args: SExp, count: int) -> typing.List[typing.Tuple[int, int]]: +def args_as_int_list( + op_name: str, args: SExp, count: int +) -> typing.List[typing.Tuple[int, int]]: int_list = list(args_as_ints(op_name, args)) if len(int_list) != count: plural = "s" if count != 1 else "" - raise EvalError("%s takes exactly %d argument%s" % (op_name, count, plural), args) + raise EvalError( + "%s takes exactly %d argument%s" % (op_name, count, plural), args + ) return int_list @@ -112,7 +118,9 @@ def args_as_bool_list(op_name: str, args: SExp, count: int) -> typing.List[SExp] bool_list = list(args_as_bools(op_name, args)) if len(bool_list) != count: plural = "s" if count != 1 else "" - raise EvalError("%s takes exactly %d argument%s" % (op_name, count, plural), args) + raise EvalError( + "%s takes exactly %d argument%s" % (op_name, count, plural), args + ) return bool_list @@ -120,9 +128,9 @@ def op_add(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: total = 0 cost = ARITH_BASE_COST arg_size = 0 - for r, l in args_as_ints("+", args): + for r, arg_len in args_as_ints("+", args): total += r - arg_size += l + arg_size += arg_len cost += ARITH_COST_PER_ARG cost += arg_size * ARITH_COST_PER_BYTE return malloc_cost(cost, args.to(total)) @@ -135,10 +143,10 @@ def op_subtract(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: sign = 1 total = 0 arg_size = 0 - for r, l in args_as_ints("-", args): + for r, arg_len in args_as_ints("-", args): total += sign * r sign = -1 - arg_size += l + arg_size += arg_len cost += ARITH_COST_PER_ARG cost += arg_size * ARITH_COST_PER_BYTE return malloc_cost(cost, args.to(total)) @@ -265,7 +273,7 @@ def op_substr(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: assert s0 is not None if arg_count == 2: - i1, = list(args_as_int32("substr", args.rest())) + (i1,) = list(args_as_int32("substr", args.rest())) i2 = len(s0) else: i1, i2 = list(args_as_int32("substr", args.rest())) @@ -294,7 +302,9 @@ def op_concat(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: def op_ash(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0, l0), (i1, l1) = args_as_int_list("ash", args, 2) if l1 > 4: - raise EvalError("ash requires int32 args (with no leading zeros)", args.rest().first()) + raise EvalError( + "ash requires int32 args (with no leading zeros)", args.rest().first() + ) if abs(i1) > 65535: raise EvalError("shift too large", args.to(i1)) if i1 >= 0: @@ -309,7 +319,9 @@ def op_ash(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: def op_lsh(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: (i0, l0), (i1, l1) = args_as_int_list("lsh", args, 2) if l1 > 4: - raise EvalError("lsh requires int32 args (with no leading zeros)", args.rest().first()) + raise EvalError( + "lsh requires int32 args (with no leading zeros)", args.rest().first() + ) if abs(i1) > 65535: raise EvalError("shift too large", args.to(i1)) # we actually want i0 to be an *unsigned* int @@ -334,9 +346,9 @@ def binop_reduction( total = initial_value arg_size = 0 cost = LOG_BASE_COST - for r, l in args_as_ints(op_name, args): + for r, arg_len in args_as_ints(op_name, args): total = op_f(total, r) - arg_size += l + arg_size += arg_len cost += LOG_COST_PER_ARG cost += arg_size * LOG_COST_PER_BYTE return malloc_cost(cost, args.to(total)) @@ -367,7 +379,7 @@ def binop(a: int, b: int) -> int: def op_lognot(args: _T_SExp) -> typing.Tuple[int, _T_SExp]: - (i0, l0), = args_as_int_list("lognot", args, 1) + ((i0, l0),) = args_as_int_list("lognot", args, 1) cost = LOGNOT_BASE_COST + l0 * LOGNOT_COST_PER_BYTE return malloc_cost(cost, args.to(~i0)) diff --git a/clvm/operators.py b/clvm/operators.py index 4b2d834b..d0952854 100644 --- a/clvm/operators.py +++ b/clvm/operators.py @@ -26,22 +26,22 @@ KEYWORDS = ( # core opcodes 0x01-x08 ". q a i c f r l x " - + # # opcodes on atoms as strings 0x09-0x0f "= >s sha256 substr strlen concat . " - + # # opcodes on atoms as ints 0x10-0x17 "+ - * / divmod > ash lsh " - + # # opcodes on atoms as vectors of bools 0x18-0x1c "logand logior logxor lognot . " - + # # opcodes for bls 1381 0x1d-0x1f "point_add pubkey_for_exp . " - + # # bool opcodes 0x20-0x23 "not any all . " - + # # misc 0x24 "softfork " ).split() @@ -101,6 +101,7 @@ def args_len(op_name: str, args: SExp) -> Iterator[int]: # this means that unknown ops where cost_function is 1, 2, or 3, may still be # fatal errors if the arguments passed are not atoms. + def default_unknown_op(op: bytes, args: SExp) -> Tuple[int, SExp]: # any opcode starting with ffff is reserved (i.e. fatal error) # opcodes are not allowed to be empty @@ -169,13 +170,11 @@ def default_unknown_op(op: bytes, args: SExp) -> Tuple[int, SExp]: class OperatorProtocol(Protocol): - def __call__(self, args: SExp) -> Tuple[int, SExp]: - ... + def __call__(self, args: SExp) -> Tuple[int, SExp]: ... class UnknownOperatorProtocol(Protocol): - def __call__(self, op: bytes, args: SExp) -> Tuple[int, SExp]: - ... + def __call__(self, op: bytes, args: SExp) -> Tuple[int, SExp]: ... _T_OperatorDict = TypeVar("_T_OperatorDict", bound="OperatorDict") @@ -198,8 +197,7 @@ def __new__( quote: bytes, apply: bytes, unknown_op_handler: UnknownOperatorProtocol = default_unknown_op, - ) -> _T_OperatorDict: - ... + ) -> _T_OperatorDict: ... @overload def __new__( @@ -208,8 +206,7 @@ def __new__( quote: Optional[bytes] = None, apply: Optional[bytes] = None, unknown_op_handler: UnknownOperatorProtocol = default_unknown_op, - ) -> _T_OperatorDict: - ... + ) -> _T_OperatorDict: ... def __new__( cls: Type[_T_OperatorDict], diff --git a/clvm/run_program.py b/clvm/run_program.py index 85529add..a4164151 100644 --- a/clvm/run_program.py +++ b/clvm/run_program.py @@ -10,7 +10,7 @@ QUOTE_COST, PATH_LOOKUP_BASE_COST, PATH_LOOKUP_COST_PER_LEG, - PATH_LOOKUP_COST_PER_ZERO_BYTE + PATH_LOOKUP_COST_PER_ZERO_BYTE, ) OpCallable = Callable[["OpStackType", "ValStackType"], int] @@ -55,7 +55,6 @@ def run_program( max_cost: Optional[int] = None, pre_eval_f: Optional[PreEvalFunction] = None, ) -> Tuple[int, SExp]: - _program = SExp.to(program) if pre_eval_f is not None: pre_eval_op = to_pre_eval_op(pre_eval_f, _program.to) diff --git a/tests/cmds_test.py b/tests/cmds_test.py index b60d4462..3ea16726 100644 --- a/tests/cmds_test.py +++ b/tests/cmds_test.py @@ -47,7 +47,6 @@ def get_test_cases(path: str) -> List[Tuple[str, List[str], str, List[str], str] class TestCmds(unittest.TestCase): def invoke_tool(self, cmd_line: str) -> Tuple[Optional[int], str, str]: - # capture io stdout_buffer = io.StringIO() stderr_buffer = io.StringIO() @@ -59,7 +58,9 @@ def invoke_tool(self, cmd_line: str) -> Tuple[Optional[int], str, str]: sys.stderr = stderr_buffer args = shlex.split(cmd_line) - [entry_point] = importlib_metadata.entry_points(group="console_scripts", name=args[0]) + [entry_point] = importlib_metadata.entry_points( + group="console_scripts", name=args[0] + ) v: Optional[int] = entry_point.load()(args) sys.stdout = old_stdout diff --git a/tests/operatordict_test.py b/tests/operatordict_test.py index 13594fd4..f797c99a 100644 --- a/tests/operatordict_test.py +++ b/tests/operatordict_test.py @@ -8,8 +8,8 @@ class OperatorDictTest(unittest.TestCase): def test_operatordict_constructor(self) -> None: """Constructing should fail if quote or apply are not specified, - either by object property or by keyword argument. - Note that they cannot be specified in the operator dictionary itself. + either by object property or by keyword argument. + Note that they cannot be specified in the operator dictionary itself. """ # ignoring because apparently it doesn't matter for this test that the types are all wrong d: Dict[bytes, OperatorProtocol] = {b"\01": "hello", b"\02": "goodbye"} # type: ignore [dict-item] diff --git a/tests/operators_test.py b/tests/operators_test.py index 5ac7c830..fd3b8f7d 100644 --- a/tests/operators_test.py +++ b/tests/operators_test.py @@ -1,37 +1,47 @@ import unittest from typing import Tuple -from clvm.operators import (OPERATOR_LOOKUP, KEYWORD_TO_ATOM, default_unknown_op, OperatorDict) +from clvm.operators import ( + OPERATOR_LOOKUP, + KEYWORD_TO_ATOM, + default_unknown_op, + OperatorDict, +) from clvm.EvalError import EvalError from clvm.SExp import SExp from clvm.costs import CONCAT_BASE_COST class OperatorsTest(unittest.TestCase): - def setUp(self) -> None: self.handler_called = False def unknown_handler(self, name: bytes, args: SExp) -> Tuple[int, SExp]: self.handler_called = True - self.assertEqual(name, b'\xff\xff1337') + self.assertEqual(name, b"\xff\xff1337") self.assertEqual(args, SExp.to(1337)) - return 42, SExp.to(b'foobar') + return 42, SExp.to(b"foobar") def test_unknown_op(self) -> None: - self.assertRaises(EvalError, lambda: OPERATOR_LOOKUP(b'\xff\xff1337', SExp.to(1337))) - od = OperatorDict(OPERATOR_LOOKUP, unknown_op_handler=lambda name, args: self.unknown_handler(name, args)) - cost, ret = od(b'\xff\xff1337', SExp.to(1337)) + self.assertRaises( + EvalError, lambda: OPERATOR_LOOKUP(b"\xff\xff1337", SExp.to(1337)) + ) + od = OperatorDict( + OPERATOR_LOOKUP, + unknown_op_handler=lambda name, args: self.unknown_handler(name, args), + ) + cost, ret = od(b"\xff\xff1337", SExp.to(1337)) self.assertTrue(self.handler_called) self.assertEqual(cost, 42) - self.assertEqual(ret, SExp.to(b'foobar')) + self.assertEqual(ret, SExp.to(b"foobar")) def test_plus(self) -> None: print(OPERATOR_LOOKUP) - self.assertEqual(OPERATOR_LOOKUP(KEYWORD_TO_ATOM['+'], SExp.to([3, 4, 5]))[1], SExp.to(12)) + self.assertEqual( + OPERATOR_LOOKUP(KEYWORD_TO_ATOM["+"], SExp.to([3, 4, 5]))[1], SExp.to(12) + ) def test_unknown_op_reserved(self) -> None: - # any op that starts with ffff is reserved, and results in a hard # failure with self.assertRaises(EvalError): @@ -46,15 +56,21 @@ def test_unknown_op_reserved(self) -> None: default_unknown_op(b"", SExp.null()) # a single ff is not sufficient to be treated as a reserved opcode - self.assertEqual(default_unknown_op(b"\xff", SExp.null()), (CONCAT_BASE_COST, SExp.null())) + self.assertEqual( + default_unknown_op(b"\xff", SExp.null()), (CONCAT_BASE_COST, SExp.null()) + ) # leading zeroes count, and this does not count as a ffff-prefix # the cost is 0xffff00 = 16776960 - self.assertEqual(default_unknown_op(b"\x00\xff\xff\x00\x00", SExp.null()), (16776961, SExp.null())) + self.assertEqual( + default_unknown_op(b"\x00\xff\xff\x00\x00", SExp.null()), + (16776961, SExp.null()), + ) def test_unknown_ops_last_bits(self) -> None: - # The last byte is ignored for no-op unknown ops for suffix in [b"\x3f", b"\x0f", b"\x00", b"\x2c"]: # the cost is unchanged by the last byte - self.assertEqual(default_unknown_op(b"\x3c" + suffix, SExp.null()), (61, SExp.null())) + self.assertEqual( + default_unknown_op(b"\x3c" + suffix, SExp.null()), (61, SExp.null()) + ) diff --git a/tests/run_program_test.py b/tests/run_program_test.py index 78ffb998..baaefe0e 100644 --- a/tests/run_program_test.py +++ b/tests/run_program_test.py @@ -4,7 +4,6 @@ class BitTest(unittest.TestCase): - def test_msb_mask(self) -> None: self.assertEqual(msb_mask(0x0), 0x0) self.assertEqual(msb_mask(0x01), 0x01) @@ -17,6 +16,6 @@ def test_msb_mask(self) -> None: self.assertEqual(msb_mask(0x80), 0x80) self.assertEqual(msb_mask(0x44), 0x40) - self.assertEqual(msb_mask(0x2a), 0x20) - self.assertEqual(msb_mask(0xff), 0x80) - self.assertEqual(msb_mask(0x0f), 0x08) + self.assertEqual(msb_mask(0x2A), 0x20) + self.assertEqual(msb_mask(0xFF), 0x80) + self.assertEqual(msb_mask(0x0F), 0x08) diff --git a/tests/serialize_test.py b/tests/serialize_test.py index d40b77e8..0f2e8521 100644 --- a/tests/serialize_test.py +++ b/tests/serialize_test.py @@ -22,7 +22,7 @@ def read(self, n: Optional[int] = -1) -> bytes: if n is not None and n > 0: fill_needed = n - len(result) - result += b' ' * fill_needed + result += b" " * fill_needed return result @@ -40,9 +40,9 @@ def has_backrefs(blob: bytes) -> bool: obj_count = 1 while obj_count > 0: b = f.read(1)[0] - if b == 0xfe: + if b == 0xFE: return True - if b == 0xff: + if b == 0xFF: obj_count += 1 else: _atom_from_stream(f, b) @@ -74,7 +74,10 @@ def check_serde(self, s: CastableType) -> bytes: if has_backrefs(b2) or len(b2) < len(b): # if we have any backrefs, ensure they actually save space self.assertTrue(len(b2) < len(b)) - print("%d bytes before %d after %d saved" % (len(b), len(b2), len(b) - len(b2))) + print( + "%d bytes before %d after %d saved" + % (len(b), len(b2), len(b) - len(b2)) + ) io_b2 = io.BytesIO(b2) self.assertRaises(ValueError, lambda: sexp_from_stream(io_b2, to_sexp_f)) io_b2 = io.BytesIO(b2) @@ -145,7 +148,7 @@ def test_very_deep_tree(self) -> None: self.check_serde(s) def test_deserialize_empty(self) -> None: - bytes_in = b'' + bytes_in = b"" with self.assertRaises(ValueError): sexp_from_stream(io.BytesIO(bytes_in), to_sexp_f) @@ -155,7 +158,7 @@ def test_deserialize_empty(self) -> None: def test_deserialize_truncated_size(self) -> None: # fe means the total number of bytes in the length-prefix is 7 # one for each bit set. 5 bytes is too few - bytes_in = b'\xfe ' + bytes_in = b"\xfe " with self.assertRaises(ValueError): sexp_from_stream(io.BytesIO(bytes_in), to_sexp_f) @@ -165,7 +168,7 @@ def test_deserialize_truncated_size(self) -> None: def test_deserialize_truncated_blob(self) -> None: # this is a complete length prefix. The blob is supposed to be 63 bytes # the blob itself is truncated though, it's less than 63 bytes - bytes_in = b'\xbf ' + bytes_in = b"\xbf " with self.assertRaises(ValueError): sexp_from_stream(io.BytesIO(bytes_in), to_sexp_f) @@ -179,7 +182,7 @@ def test_deserialize_large_blob(self) -> None: # we don't support blobs this large, and we should fail immediately when # exceeding the max blob size, rather than trying to read this many # bytes from the stream - bytes_in = b'\xfe' + b'\xff' * 6 + bytes_in = b"\xfe" + b"\xff" * 6 with self.assertRaises(ValueError): sexp_from_stream(InfiniteStream(bytes_in), to_sexp_f) diff --git a/tests/to_sexp_test.py b/tests/to_sexp_test.py index 2d1e646d..04e675d8 100644 --- a/tests/to_sexp_test.py +++ b/tests/to_sexp_test.py @@ -96,7 +96,6 @@ def test_wrap_sexp(self) -> None: assert o.atom == bytes([1]) def test_arbitrary_underlying_tree(self) -> None: - # SExp provides a view on top of a tree of arbitrary types, as long as # those types implement the CLVMObject protocol. This is an example of # a tree that's generated @@ -127,15 +126,21 @@ def pair(self) -> Optional[Tuple[CLVMStorage, CLVMStorage]]: if self.depth == 0: return None new_depth: int = self.depth - 1 - return (GeneratedTree(new_depth, self.val), GeneratedTree(new_depth, self.val + 2**new_depth)) + return ( + GeneratedTree(new_depth, self.val), + GeneratedTree(new_depth, self.val + 2**new_depth), + ) @pair.setter def pair(self, val: Optional[Tuple[CLVMStorage, CLVMStorage]]) -> None: raise RuntimeError("setting not supported in this test class") tree = SExp.to(GeneratedTree(5, 0)) - assert print_leaves(tree) == "0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 " + \ - "16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 " + assert ( + print_leaves(tree) + == "0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 " + + "16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 " + ) tree = SExp.to(GeneratedTree(3, 0)) assert print_leaves(tree) == "0 1 2 3 4 5 6 7 " @@ -144,7 +149,6 @@ def pair(self, val: Optional[Tuple[CLVMStorage, CLVMStorage]]) -> None: assert print_leaves(tree) == "10 11 12 13 14 15 16 17 " def test_looks_like_clvm_object(self) -> None: - # this function can't look at the values, that would cause a cascade of # eager evaluation/conversion pair_and_atom = PairAndAtom()