Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion clvm/CLVMObject.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ def __new__(
self = super(CLVMObject, class_).__new__(class_)
if isinstance(narrowed_v, tuple):
if len(narrowed_v) != 2:
raise ValueError("tuples must be of size 2, cannot create CLVMObject from: %s" % str(narrowed_v))
raise ValueError(
"tuples must be of size 2, cannot create CLVMObject from: %s"
% str(narrowed_v)
)
self.pair = narrowed_v
self.atom = None
else:
Expand Down
11 changes: 9 additions & 2 deletions clvm/SExp.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,15 @@ def looks_like_clvm_object(o: typing.Any) -> typing_extensions.TypeGuard[CLVMSto

# this function recognizes some common types and turns them into plain bytes,
def convert_atom_to_bytes(
v: typing.Union[bytes, str, int, None, typing.List[typing_extensions.Never], typing.SupportsBytes],
v: typing.Union[
bytes,
str,
int,
None,
typing.List[typing_extensions.Never],
typing.SupportsBytes,
],
) -> bytes:

if isinstance(v, bytes):
return v
if isinstance(v, str):
Expand Down Expand Up @@ -141,6 +147,7 @@ class SExp:
elements implementing the CLVM object protocol.
Exactly one of "atom" and "pair" must be None.
"""

true: typing.ClassVar[SExp]
false: typing.ClassVar[SExp]
__null__: typing.ClassVar[SExp]
Expand Down
8 changes: 6 additions & 2 deletions clvm/as_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

OpCallable = Callable[["OpStackType", "ValStackType"], None]

PythonReturnType = Union[bytes, Tuple["PythonReturnType", "PythonReturnType"], List["PythonReturnType"]]
PythonReturnType = Union[
bytes, Tuple["PythonReturnType", "PythonReturnType"], List["PythonReturnType"]
]

ValType = Union["SExp", PythonReturnType]
ValStackType = List[ValType]
Expand All @@ -22,7 +24,9 @@ def _roll(op_stack: OpStackType, val_stack: ValStackType) -> None:
val_stack.append(v2)


MakeTupleValStackType = List[Union[bytes, Tuple[object, object], "MakeTupleValStackType"]]
MakeTupleValStackType = List[
Union[bytes, Tuple[object, object], "MakeTupleValStackType"]
]


def _make_tuple(op_stack: OpStackType, val_stack: ValStackType) -> None:
Expand Down
40 changes: 26 additions & 14 deletions clvm/more_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,15 +87,21 @@ def args_as_int32(op_name: str, args: SExp) -> typing.Iterator[int]:
if arg.atom is None:
raise EvalError("%s requires int32 args" % op_name, arg)
if len(arg.atom) > 4:
raise EvalError("%s requires int32 args (with no leading zeros)" % op_name, arg)
raise EvalError(
"%s requires int32 args (with no leading zeros)" % op_name, arg
)
yield arg.as_int()


def args_as_int_list(op_name: str, args: SExp, count: int) -> typing.List[typing.Tuple[int, int]]:
def args_as_int_list(
op_name: str, args: SExp, count: int
) -> typing.List[typing.Tuple[int, int]]:
int_list = list(args_as_ints(op_name, args))
if len(int_list) != count:
plural = "s" if count != 1 else ""
raise EvalError("%s takes exactly %d argument%s" % (op_name, count, plural), args)
raise EvalError(
"%s takes exactly %d argument%s" % (op_name, count, plural), args
)
return int_list


Expand All @@ -112,17 +118,19 @@ def args_as_bool_list(op_name: str, args: SExp, count: int) -> typing.List[SExp]
bool_list = list(args_as_bools(op_name, args))
if len(bool_list) != count:
plural = "s" if count != 1 else ""
raise EvalError("%s takes exactly %d argument%s" % (op_name, count, plural), args)
raise EvalError(
"%s takes exactly %d argument%s" % (op_name, count, plural), args
)
return bool_list


def op_add(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
total = 0
cost = ARITH_BASE_COST
arg_size = 0
for r, l in args_as_ints("+", args):
for r, arg_len in args_as_ints("+", args):
total += r
arg_size += l
arg_size += arg_len
cost += ARITH_COST_PER_ARG
cost += arg_size * ARITH_COST_PER_BYTE
return malloc_cost(cost, args.to(total))
Expand All @@ -135,10 +143,10 @@ def op_subtract(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
sign = 1
total = 0
arg_size = 0
for r, l in args_as_ints("-", args):
for r, arg_len in args_as_ints("-", args):
total += sign * r
sign = -1
arg_size += l
arg_size += arg_len
cost += ARITH_COST_PER_ARG
cost += arg_size * ARITH_COST_PER_BYTE
return malloc_cost(cost, args.to(total))
Expand Down Expand Up @@ -265,7 +273,7 @@ def op_substr(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
assert s0 is not None

if arg_count == 2:
i1, = list(args_as_int32("substr", args.rest()))
(i1,) = list(args_as_int32("substr", args.rest()))
i2 = len(s0)
else:
i1, i2 = list(args_as_int32("substr", args.rest()))
Expand Down Expand Up @@ -294,7 +302,9 @@ def op_concat(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
def op_ash(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
(i0, l0), (i1, l1) = args_as_int_list("ash", args, 2)
if l1 > 4:
raise EvalError("ash requires int32 args (with no leading zeros)", args.rest().first())
raise EvalError(
"ash requires int32 args (with no leading zeros)", args.rest().first()
)
if abs(i1) > 65535:
raise EvalError("shift too large", args.to(i1))
if i1 >= 0:
Expand All @@ -309,7 +319,9 @@ def op_ash(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
def op_lsh(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
(i0, l0), (i1, l1) = args_as_int_list("lsh", args, 2)
if l1 > 4:
raise EvalError("lsh requires int32 args (with no leading zeros)", args.rest().first())
raise EvalError(
"lsh requires int32 args (with no leading zeros)", args.rest().first()
)
if abs(i1) > 65535:
raise EvalError("shift too large", args.to(i1))
# we actually want i0 to be an *unsigned* int
Expand All @@ -334,9 +346,9 @@ def binop_reduction(
total = initial_value
arg_size = 0
cost = LOG_BASE_COST
for r, l in args_as_ints(op_name, args):
for r, arg_len in args_as_ints(op_name, args):
total = op_f(total, r)
arg_size += l
arg_size += arg_len
cost += LOG_COST_PER_ARG
cost += arg_size * LOG_COST_PER_BYTE
return malloc_cost(cost, args.to(total))
Expand Down Expand Up @@ -367,7 +379,7 @@ def binop(a: int, b: int) -> int:


def op_lognot(args: _T_SExp) -> typing.Tuple[int, _T_SExp]:
(i0, l0), = args_as_int_list("lognot", args, 1)
((i0, l0),) = args_as_int_list("lognot", args, 1)
cost = LOGNOT_BASE_COST + l0 * LOGNOT_COST_PER_BYTE
return malloc_cost(cost, args.to(~i0))

Expand Down
25 changes: 11 additions & 14 deletions clvm/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@
KEYWORDS = (
# core opcodes 0x01-x08
". q a i c f r l x "

#
# opcodes on atoms as strings 0x09-0x0f
"= >s sha256 substr strlen concat . "

#
# opcodes on atoms as ints 0x10-0x17
"+ - * / divmod > ash lsh "

#
# opcodes on atoms as vectors of bools 0x18-0x1c
"logand logior logxor lognot . "

#
# opcodes for bls 1381 0x1d-0x1f
"point_add pubkey_for_exp . "

#
# bool opcodes 0x20-0x23
"not any all . "

#
# misc 0x24
"softfork "
).split()
Expand Down Expand Up @@ -101,6 +101,7 @@ def args_len(op_name: str, args: SExp) -> Iterator[int]:
# this means that unknown ops where cost_function is 1, 2, or 3, may still be
# fatal errors if the arguments passed are not atoms.


def default_unknown_op(op: bytes, args: SExp) -> Tuple[int, SExp]:
# any opcode starting with ffff is reserved (i.e. fatal error)
# opcodes are not allowed to be empty
Expand Down Expand Up @@ -169,13 +170,11 @@ def default_unknown_op(op: bytes, args: SExp) -> Tuple[int, SExp]:


class OperatorProtocol(Protocol):
def __call__(self, args: SExp) -> Tuple[int, SExp]:
...
def __call__(self, args: SExp) -> Tuple[int, SExp]: ...


class UnknownOperatorProtocol(Protocol):
def __call__(self, op: bytes, args: SExp) -> Tuple[int, SExp]:
...
def __call__(self, op: bytes, args: SExp) -> Tuple[int, SExp]: ...


_T_OperatorDict = TypeVar("_T_OperatorDict", bound="OperatorDict")
Expand All @@ -198,8 +197,7 @@ def __new__(
quote: bytes,
apply: bytes,
unknown_op_handler: UnknownOperatorProtocol = default_unknown_op,
) -> _T_OperatorDict:
...
) -> _T_OperatorDict: ...

@overload
def __new__(
Expand All @@ -208,8 +206,7 @@ def __new__(
quote: Optional[bytes] = None,
apply: Optional[bytes] = None,
unknown_op_handler: UnknownOperatorProtocol = default_unknown_op,
) -> _T_OperatorDict:
...
) -> _T_OperatorDict: ...

def __new__(
cls: Type[_T_OperatorDict],
Expand Down
3 changes: 1 addition & 2 deletions clvm/run_program.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
QUOTE_COST,
PATH_LOOKUP_BASE_COST,
PATH_LOOKUP_COST_PER_LEG,
PATH_LOOKUP_COST_PER_ZERO_BYTE
PATH_LOOKUP_COST_PER_ZERO_BYTE,
)

OpCallable = Callable[["OpStackType", "ValStackType"], int]
Expand Down Expand Up @@ -55,7 +55,6 @@ def run_program(
max_cost: Optional[int] = None,
pre_eval_f: Optional[PreEvalFunction] = None,
) -> Tuple[int, SExp]:

_program = SExp.to(program)
if pre_eval_f is not None:
pre_eval_op = to_pre_eval_op(pre_eval_f, _program.to)
Expand Down
5 changes: 3 additions & 2 deletions tests/cmds_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def get_test_cases(path: str) -> List[Tuple[str, List[str], str, List[str], str]

class TestCmds(unittest.TestCase):
def invoke_tool(self, cmd_line: str) -> Tuple[Optional[int], str, str]:

# capture io
stdout_buffer = io.StringIO()
stderr_buffer = io.StringIO()
Expand All @@ -59,7 +58,9 @@ def invoke_tool(self, cmd_line: str) -> Tuple[Optional[int], str, str]:
sys.stderr = stderr_buffer

args = shlex.split(cmd_line)
[entry_point] = importlib_metadata.entry_points(group="console_scripts", name=args[0])
[entry_point] = importlib_metadata.entry_points(
group="console_scripts", name=args[0]
)
v: Optional[int] = entry_point.load()(args)

sys.stdout = old_stdout
Expand Down
4 changes: 2 additions & 2 deletions tests/operatordict_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
class OperatorDictTest(unittest.TestCase):
def test_operatordict_constructor(self) -> None:
"""Constructing should fail if quote or apply are not specified,
either by object property or by keyword argument.
Note that they cannot be specified in the operator dictionary itself.
either by object property or by keyword argument.
Note that they cannot be specified in the operator dictionary itself.
"""
# ignoring because apparently it doesn't matter for this test that the types are all wrong
d: Dict[bytes, OperatorProtocol] = {b"\01": "hello", b"\02": "goodbye"} # type: ignore [dict-item]
Expand Down
44 changes: 30 additions & 14 deletions tests/operators_test.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,47 @@
import unittest
from typing import Tuple

from clvm.operators import (OPERATOR_LOOKUP, KEYWORD_TO_ATOM, default_unknown_op, OperatorDict)
from clvm.operators import (
OPERATOR_LOOKUP,
KEYWORD_TO_ATOM,
default_unknown_op,
OperatorDict,
)
from clvm.EvalError import EvalError
from clvm.SExp import SExp
from clvm.costs import CONCAT_BASE_COST


class OperatorsTest(unittest.TestCase):

def setUp(self) -> None:
self.handler_called = False

def unknown_handler(self, name: bytes, args: SExp) -> Tuple[int, SExp]:
self.handler_called = True
self.assertEqual(name, b'\xff\xff1337')
self.assertEqual(name, b"\xff\xff1337")
self.assertEqual(args, SExp.to(1337))
return 42, SExp.to(b'foobar')
return 42, SExp.to(b"foobar")

def test_unknown_op(self) -> None:
self.assertRaises(EvalError, lambda: OPERATOR_LOOKUP(b'\xff\xff1337', SExp.to(1337)))
od = OperatorDict(OPERATOR_LOOKUP, unknown_op_handler=lambda name, args: self.unknown_handler(name, args))
cost, ret = od(b'\xff\xff1337', SExp.to(1337))
self.assertRaises(
EvalError, lambda: OPERATOR_LOOKUP(b"\xff\xff1337", SExp.to(1337))
)
od = OperatorDict(
OPERATOR_LOOKUP,
unknown_op_handler=lambda name, args: self.unknown_handler(name, args),
)
cost, ret = od(b"\xff\xff1337", SExp.to(1337))
self.assertTrue(self.handler_called)
self.assertEqual(cost, 42)
self.assertEqual(ret, SExp.to(b'foobar'))
self.assertEqual(ret, SExp.to(b"foobar"))

def test_plus(self) -> None:
print(OPERATOR_LOOKUP)
self.assertEqual(OPERATOR_LOOKUP(KEYWORD_TO_ATOM['+'], SExp.to([3, 4, 5]))[1], SExp.to(12))
self.assertEqual(
OPERATOR_LOOKUP(KEYWORD_TO_ATOM["+"], SExp.to([3, 4, 5]))[1], SExp.to(12)
)

def test_unknown_op_reserved(self) -> None:

# any op that starts with ffff is reserved, and results in a hard
# failure
with self.assertRaises(EvalError):
Expand All @@ -46,15 +56,21 @@ def test_unknown_op_reserved(self) -> None:
default_unknown_op(b"", SExp.null())

# a single ff is not sufficient to be treated as a reserved opcode
self.assertEqual(default_unknown_op(b"\xff", SExp.null()), (CONCAT_BASE_COST, SExp.null()))
self.assertEqual(
default_unknown_op(b"\xff", SExp.null()), (CONCAT_BASE_COST, SExp.null())
)

# leading zeroes count, and this does not count as a ffff-prefix
# the cost is 0xffff00 = 16776960
self.assertEqual(default_unknown_op(b"\x00\xff\xff\x00\x00", SExp.null()), (16776961, SExp.null()))
self.assertEqual(
default_unknown_op(b"\x00\xff\xff\x00\x00", SExp.null()),
(16776961, SExp.null()),
)

def test_unknown_ops_last_bits(self) -> None:

# The last byte is ignored for no-op unknown ops
for suffix in [b"\x3f", b"\x0f", b"\x00", b"\x2c"]:
# the cost is unchanged by the last byte
self.assertEqual(default_unknown_op(b"\x3c" + suffix, SExp.null()), (61, SExp.null()))
self.assertEqual(
default_unknown_op(b"\x3c" + suffix, SExp.null()), (61, SExp.null())
)
7 changes: 3 additions & 4 deletions tests/run_program_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@


class BitTest(unittest.TestCase):

def test_msb_mask(self) -> None:
self.assertEqual(msb_mask(0x0), 0x0)
self.assertEqual(msb_mask(0x01), 0x01)
Expand All @@ -17,6 +16,6 @@ def test_msb_mask(self) -> None:
self.assertEqual(msb_mask(0x80), 0x80)

self.assertEqual(msb_mask(0x44), 0x40)
self.assertEqual(msb_mask(0x2a), 0x20)
self.assertEqual(msb_mask(0xff), 0x80)
self.assertEqual(msb_mask(0x0f), 0x08)
self.assertEqual(msb_mask(0x2A), 0x20)
self.assertEqual(msb_mask(0xFF), 0x80)
self.assertEqual(msb_mask(0x0F), 0x08)
Loading