From 11d36ffd6517da0d97badad330cc4e31acc7b6a9 Mon Sep 17 00:00:00 2001 From: koki Date: Thu, 26 Dec 2024 04:54:03 +0800 Subject: [PATCH] rearrange --- .gitignore | 1 + examples/dis_demo.py | 32 ++++---- ohre/abcre/core/Annotation.py | 4 +- ohre/abcre/core/ClassIndex.py | 4 +- ohre/abcre/dis/AsmArg.py | 12 ++- ohre/abcre/dis/AsmLiteral.py | 75 ++++++++++++++++++ ohre/abcre/dis/AsmMethod.py | 41 +++++----- ohre/abcre/dis/AsmRecord.py | 33 ++++++-- ohre/abcre/dis/AsmString.py | 11 +-- ohre/abcre/dis/CodeBlock.py | 14 ++-- ohre/abcre/dis/CodeBlocks.py | 16 ++-- ohre/abcre/dis/ControlFlow.py | 9 +-- ohre/abcre/dis/DebugBase.py | 19 +++++ ohre/abcre/dis/DisFile.py | 34 +++++---- ohre/abcre/dis/ISA.py | 5 +- ohre/abcre/dis/NAC.py | 10 +-- ohre/abcre/dis/NACTYPE.py | 1 - .../abcre/dis/{NativeToTAC.py => NACtoTAC.py} | 76 ++++++++++++------- ohre/abcre/dis/PandaReverser.py | 41 ++++++++++ ohre/abcre/dis/TAC.py | 10 +-- ohre/misc/Log.py | 2 +- ohre/misc/utils.py | 60 ++++++++++++--- 22 files changed, 356 insertions(+), 154 deletions(-) create mode 100644 ohre/abcre/dis/AsmLiteral.py create mode 100644 ohre/abcre/dis/DebugBase.py rename ohre/abcre/dis/{NativeToTAC.py => NACtoTAC.py} (68%) create mode 100644 ohre/abcre/dis/PandaReverser.py diff --git a/.gitignore b/.gitignore index f8dda68..8b7e20e 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ rules_local/ *.log isa.yaml tmp/ +temp/ tmp_extract/ tmp_hap_extract/ tmp_app_extract/ diff --git a/examples/dis_demo.py b/examples/dis_demo.py index 5540479..23405ae 100644 --- a/examples/dis_demo.py +++ b/examples/dis_demo.py @@ -2,6 +2,7 @@ import ohre from ohre.abcre.dis.ControlFlow import ControlFlow +from ohre.abcre.dis.PandaReverser import PandaReverser from ohre.abcre.dis.DisFile import DisFile from ohre.misc import Log @@ -13,23 +14,22 @@ parser.add_argument("dis_path", type=str, help="path to the dis file (ark_disasm-ed abc)") arg = parser.parse_args() dis_path = arg.dis_path - dis_file = DisFile(dis_path) + dis_file: DisFile = DisFile(dis_path) + panda_re = PandaReverser(dis_file) + print(f"> panda_re: {panda_re}") - print(f"> {dis_file}") - - # print(f"\n> {dis_file.debug_deep()}") - # for method in dis_file.methods: - # print(f">> {method.debug_deep()}") - - # for asmstr in dis_file.asmstrs: - # print(f">> {asmstr}") + for lit in dis_file.literals: + print(f">> {lit}") + for method in dis_file.methods: + print(f">> {method}") + for record in dis_file.records: + print(f">> {record}") + for asmstr in dis_file.asmstrs: + print(f">> {asmstr}") # === reverse truly START FUNC_IDX = 7 - # print(f">> before ControlFlow build {dis_file.methods[FUNC_IDX].debug_deep()}") - dis_file.methods[FUNC_IDX].split_native_code_block() - # print(f">> after ControlFlow build {dis_file.methods[FUNC_IDX].debug_deep()}") - dis_file.methods[FUNC_IDX].native_code_to_TAC() - # for asm_method in dis_file.methods: - # asm_method.split_native_code_block() - # print(f">> CFed: {asm_method.debug_deep()}") + # print(f">> before ControlFlow build {dis_file.methods[FUNC_IDX]._debug_vstr()}") + panda_re.split_native_code_block(FUNC_IDX) + print(f">> after ControlFlow build {panda_re.dis_file.methods[FUNC_IDX]._debug_vstr()}") + panda_re.trans_NAC_to_TAC(method_id=FUNC_IDX) diff --git a/ohre/abcre/core/Annotation.py b/ohre/abcre/core/Annotation.py index f07b1e6..e4198a7 100644 --- a/ohre/abcre/core/Annotation.py +++ b/ohre/abcre/core/Annotation.py @@ -22,7 +22,7 @@ def __init__(self, buf, pos: int): def __str__(self): out_elements = [] for elem in self.elements: - out_elements.append(f"{elem.debug_short()}") + out_elements.append(f"{elem._debug_str()}") out_elements = ";".join(out_elements) out_element_types = "" @@ -56,7 +56,7 @@ def __str__(self): name_off {hex(self.name_off)} value {hex(self.get_value())}" return out - def debug_short(self) -> str: + def _debug_str(self) -> str: return f"{self.name.get_str()} {hex(self.value)}" def get_value(self, type_num): diff --git a/ohre/abcre/core/ClassIndex.py b/ohre/abcre/core/ClassIndex.py index 0754327..7feb00c 100644 --- a/ohre/abcre/core/ClassIndex.py +++ b/ohre/abcre/core/ClassIndex.py @@ -20,6 +20,6 @@ def __str__(self): out += f" {hex(v)}" return out - def debug_deep(self): - # TODO: implement debug_deep in Class + def _debug_vstr(self): + # TODO: implement _debug_vstr in Class pass diff --git a/ohre/abcre/dis/AsmArg.py b/ohre/abcre/dis/AsmArg.py index 3e60130..a464438 100644 --- a/ohre/abcre/dis/AsmArg.py +++ b/ohre/abcre/dis/AsmArg.py @@ -1,8 +1,9 @@ from ohre.abcre.dis.AsmTypes import AsmTypes +from ohre.abcre.dis.DebugBase import DebugBase from ohre.misc import Log, utils -class AsmArg: +class AsmArg(DebugBase): def __init__(self, arg_type: AsmTypes = AsmTypes.UNKNOWN, name="", value=None, obj_ref=None): self.type = arg_type # name: e.g. for v0, type is VAR, name is v0(stored without truncating the prefix v) @@ -11,9 +12,6 @@ def __init__(self, arg_type: AsmTypes = AsmTypes.UNKNOWN, name="", value=None, o self.value = value self.obj_ref = obj_ref - def __str__(self): - return self.debug_short() - @classmethod def build_arg(cls, s: str): assert isinstance(s, str) and len(s) > 0 @@ -26,7 +24,7 @@ def build_arg(cls, s: str): def is_value_valid(self) -> bool: # TODO: for some types, value is not valid, judge it pass - def debug_short(self): + def _debug_str(self): out = f"{AsmTypes.get_code_name(self.type)}-{self.name}" if (self.value is not None): out += f"({self.value})" @@ -34,6 +32,6 @@ def debug_short(self): out += f"//{self.obj_ref}" return out - def debug_deep(self): - out = f"{self.debug_short()}" + def _debug_vstr(self): + out = f"{self._debug_str()}" return out diff --git a/ohre/abcre/dis/AsmLiteral.py b/ohre/abcre/dis/AsmLiteral.py new file mode 100644 index 0000000..fb9489f --- /dev/null +++ b/ohre/abcre/dis/AsmLiteral.py @@ -0,0 +1,75 @@ +from typing import Any, Dict, Iterable, List, Tuple, Union + +from ohre.abcre.dis.CODE_LV import CODE_LV +from ohre.abcre.dis.DebugBase import DebugBase +from ohre.misc import Log, utils + + +class AsmLiteral(DebugBase): + # fields in Class + def __init__(self, lines: List[str]): + first_line_parts = lines[0].strip().split(" ") + assert first_line_parts[0].isdigit() + self.id = int(first_line_parts[0]) + self.address = int(first_line_parts[1], 16) + self.module_request_array: Dict = None + self.module_tags: List[Dict] = None + if (len(lines) == 1): + print(f"AsmLiteral todo: single line, processer is todo") # TODO: normal situation + else: + self._process_module_request_array(lines) + + def _process_module_request_array(self, lines: List[str]): + s_idx = lines[0].find("{") + e_idx = lines[0].find("[") + module_tag_cnt = lines[0][s_idx + 1:e_idx].strip() + assert module_tag_cnt.isdigit() + module_tag_cnt = int(module_tag_cnt) + # module_request_array + line_all = "" + for s in lines: + line_all += s + module_request_array_start = line_all.find("MODULE_REQUEST_ARRAY: {") + len("MODULE_REQUEST_ARRAY: {") + module_request_array_end = line_all.find("};", module_request_array_start) + assert module_request_array_start > 0 and module_request_array_end > 0 + module_request_array = line_all[module_request_array_start:module_request_array_end].strip() + module_request_dict = {} + if len(module_request_array): + module_requests = module_request_array.split(",") + for module_request in module_requests: + key, value = utils.find_single_kv(module_request, ":") + if (key is not None and value is not None and key.isdigit()): + key = int(key) + module_request_dict[key] = value + self.module_request_array = module_request_dict + # module_tags + module_tags_str_all = line_all[module_request_array_end:].strip() + module_tags_l = list() + if len(module_tags_str_all): + module_tags_str_all = module_tags_str_all.split(";") + for module_tag_line in module_tags_str_all: + kv_s = module_tag_line.split(",") + d = dict() + for kv in kv_s: + key, value = utils.find_single_kv(kv.strip(), ":") + if (key is not None and value is not None): + d[key] = value + if (len(d)): + module_tags_l.append(d) + self.module_tags = module_tags_l + + def _debug_str(self) -> str: + out = f"AsmLiteral: {self.id} {hex(self.address)}" + if (self.module_request_array is not None): + out += f" module_request_array({len(self.module_request_array)})" + if (self.module_tags is not None): + out += f" module_tags({len(self.module_tags)})" + return out + + def _debug_vstr(self) -> str: + out = f"AsmLiteral: {self.id} {hex(self.address)}" + if (self.module_request_array is not None): + out += f" module_request_array({len(self.module_request_array)}) {self.module_request_array}" + if (self.module_tags is not None): + out += f" module_tags({len(self.module_tags)}) {self.module_tags}" + return out diff --git a/ohre/abcre/dis/AsmMethod.py b/ohre/abcre/dis/AsmMethod.py index 4c2c3a7..a7fc8fc 100644 --- a/ohre/abcre/dis/AsmMethod.py +++ b/ohre/abcre/dis/AsmMethod.py @@ -3,12 +3,12 @@ from ohre.abcre.dis.AsmTypes import AsmTypes from ohre.abcre.dis.CODE_LV import CODE_LV from ohre.abcre.dis.CodeBlocks import CodeBlocks -from ohre.abcre.dis.NativeToTAC import NativeToTAC from ohre.abcre.dis.ControlFlow import ControlFlow from ohre.misc import Log, utils +from ohre.abcre.dis.DebugBase import DebugBase -class AsmMethod: +class AsmMethod(DebugBase): # fields in Class def __init__(self, slotNumberIdx, lines: List[str]): assert len(lines) >= 2 @@ -16,9 +16,11 @@ def __init__(self, slotNumberIdx, lines: List[str]): self.return_type = "None" self.file_name: str = "" self.class_func_name: str = "" + self.class_name: str = "" + self.func_name: str = "" self.func_type: str = "" self.args: List = list() - self.code_blocks: CodeBlocks | None = None + self.code_blocks: Union[CodeBlocks, None] = None insts = self._process_method(lines) self.code_blocks = CodeBlocks(insts) @@ -27,25 +29,23 @@ def split_native_code_block(self): self.code_blocks = ControlFlow.split_native_code_block(self.code_blocks) self.code_blocks.set_level(CODE_LV.NATIVE_BLOCK_SPLITED) - def native_code_to_TAC(self): - assert self.code_blocks.level == CODE_LV.NATIVE_BLOCK_SPLITED - self.code_blocks = NativeToTAC.native_code_to_TAC(self.code_blocks) - self.code_blocks.set_level(CODE_LV.TAC) - def _process_1st_line(self, line: str): parts = line.split(" ") assert parts[0] == ".function" self.return_type = parts[1].strip() file_func_name = parts[2].split("(")[0] - num = file_func_name.find(".ets") - if (not num > 0): - num = file_func_name.find(".src") - if (num > 0 and num < len(file_func_name) - 5): - self.file_name = file_func_name[:num + 4] - self.class_func_name = file_func_name[num + 4 + 1:] + file_postfix_idx = file_func_name.find(".ets") + if (not file_postfix_idx > 0): + file_postfix_idx = file_func_name.find(".src") + if (file_postfix_idx > 0 and file_postfix_idx < len(file_func_name) - 5): + self.file_name = file_func_name[:file_postfix_idx + 4] + self.class_func_name = file_func_name[file_postfix_idx + 4 + 1:] else: self.file_name = file_func_name self.class_func_name = file_func_name + if (self.file_name.startswith("&")): + self.file_name = self.file_name[1:] + # reverse find: something like i = len(parts) - 1 while (i >= 0): if (parts[i].startswith("<") and parts[i].endswith(">") and len(parts[i]) >= 3): @@ -91,20 +91,17 @@ def _process_common_inst(self, line: str) -> List[str]: idx += 1 while (idx < len(line)): start_idx = idx - idx = utils.find_next_delimiter(line, start_idx) + idx = utils.find_next_delimiter_single_line(line, start_idx) ret.append(line[start_idx: idx].strip()) idx = idx + 1 return ret - def __str__(self): - return self.debug_short() - - def debug_short(self) -> str: + def _debug_str(self) -> str: out = f"AsmMethod: {self.slotNumberIdx} {self.func_type} {self.class_func_name} \ ret {self.return_type} file: {self.file_name}\n\ -args({len(self.args)}) {self.args} code_blocks({len(self.code_blocks)})" +\targs({len(self.args)}) {self.args} code_blocks({len(self.code_blocks)})" return out - def debug_deep(self) -> str: - out = f"{self.debug_short()}\n{self.code_blocks.debug_deep()}" + def _debug_vstr(self) -> str: + out = f"{self._debug_str()}\n{self.code_blocks._debug_vstr()}" return out diff --git a/ohre/abcre/dis/AsmRecord.py b/ohre/abcre/dis/AsmRecord.py index 6340e9e..134a929 100644 --- a/ohre/abcre/dis/AsmRecord.py +++ b/ohre/abcre/dis/AsmRecord.py @@ -1,21 +1,24 @@ from typing import Any, Dict, Iterable, List, Tuple, Union from ohre.abcre.dis.AsmTypes import AsmTypes +from ohre.abcre.dis.DebugBase import DebugBase from ohre.misc import Log -class AsmRecord: +class AsmRecord(DebugBase): # fields in Class def __init__(self, lines: List[str]): + self.file_class_name: str = "" + self.file_name: str = "" self.class_name: str = "" self.fields: Dict[Tuple[str, Any]] = dict() # k: field name; v: (type, value) for line in lines: line = line.strip() if ("}" in line): - return + break elif ("{" in line and ".record" in line): parts = line.split(" ") - self.class_name = parts[1].split("@")[0] + self.file_class_name = parts[1].split("@")[0].strip() elif ("=" in line): parts = line.split("=") ty, name = parts[0].split(" ")[0].strip(), parts[0].split(" ")[1].strip() @@ -27,9 +30,27 @@ def __init__(self, lines: List[str]): self.fields[name] = (ty, value) else: Log.warn(f"invalid line in AsmRecord: {line},\nlines: {lines}") + # file+class name like: &entry.src.main.ets.entryability.EntryAbility& + if (self.file_class_name.startswith("&")): + self.file_class_name = self.file_class_name[1:] + if (self.file_class_name.endswith("&")): + self.file_class_name = self.file_class_name[:-1] + file_postfix_idx = self.file_class_name.find(".ets") + if (not file_postfix_idx > 0): + file_postfix_idx = self.file_class_name.find(".src") + if (file_postfix_idx > 0): + self.file_name = self.file_class_name[:file_postfix_idx + len(".ets")].strip() + self.class_name = self.file_class_name[file_postfix_idx + len(".ets") + 1:].strip() - def debug_deep(self): - out = f"AsmRecord {self.class_name}: " + def _debug_str(self): + out = f"AsmRecord: {self.file_class_name} {self.file_name} \ +class_name({len(self.class_name)}) {self.class_name}: " for field_name, (ty, value) in self.fields.items(): - out += f"{field_name}({ty}) {value};" + if (isinstance(value, int)): + out += f"{field_name}({ty}) {hex(value)}; " + else: + out += f"{field_name}({ty}) {value}; " return out + + def _debug_vstr(self): + return self._debug_str() diff --git a/ohre/abcre/dis/AsmString.py b/ohre/abcre/dis/AsmString.py index b32b9fb..9e7f449 100644 --- a/ohre/abcre/dis/AsmString.py +++ b/ohre/abcre/dis/AsmString.py @@ -1,10 +1,11 @@ from typing import Any, Dict, Iterable, List, Tuple, Union from ohre.abcre.dis.AsmTypes import AsmTypes +from ohre.abcre.dis.DebugBase import DebugBase from ohre.misc import Log -class AsmString: +class AsmString(DebugBase): def __init__(self, line: str): idx = line.find(", ") assert idx > 2 and idx < len(line) - 2 @@ -13,9 +14,9 @@ def __init__(self, line: str): idx2 = remain_line.find(":") self.name_value = remain_line[idx2 + 1:] - def __str__(self): - return self.debug_deep() - - def debug_deep(self): + def _debug_str(self): out = f"AsmString({hex(self.offset)}) {len(self.name_value)} {self.name_value}" return out + + def _debug_vstr(self): + return self._debug_str() diff --git a/ohre/abcre/dis/CodeBlock.py b/ohre/abcre/dis/CodeBlock.py index f122c71..d8f6b08 100644 --- a/ohre/abcre/dis/CodeBlock.py +++ b/ohre/abcre/dis/CodeBlock.py @@ -3,10 +3,11 @@ from ohre.abcre.dis.NAC import NAC from ohre.abcre.dis.NACTYPE import NACTYPE +from ohre.abcre.dis.DebugBase import DebugBase from ohre.abcre.dis.TAC import TAC -class CodeBlock(): # asm instruction(NAC) cantained +class CodeBlock(DebugBase): # asm instruction(NAC) cantained def __init__(self, in_l: Union[List[List[str]], List[NAC], List[TAC]]): assert len(in_l) >= 0 self.insts: Union[List[NAC], List[TAC]] = list() @@ -22,21 +23,18 @@ def __init__(self, in_l: Union[List[List[str]], List[NAC], List[TAC]]): def get_slice_block(self, idx_start: int, idx_end: int): return CodeBlock(copy.deepcopy(self.insts[idx_start: idx_end])) - def __str__(self): - return self.debug_short() - def __len__(self) -> int: return len(self.insts) - def debug_short(self) -> str: + def _debug_str(self) -> str: out = f"CodeBlock: insts {len(self.insts)}" return out - def debug_deep(self) -> str: + def _debug_vstr(self) -> str: out = f"CodeBlock: insts {len(self.insts)}\n" for i in range(len(self.insts)): if (self.insts[i].type == NACTYPE.LABEL): - out += f"{i} {self.insts[i].debug_deep()}\n" + out += f"{i} {self.insts[i]._debug_vstr()}\n" else: - out += f"{i}\t{self.insts[i].debug_deep()}\n" + out += f"{i}\t{self.insts[i]._debug_vstr()}\n" return out.strip() diff --git a/ohre/abcre/dis/CodeBlocks.py b/ohre/abcre/dis/CodeBlocks.py index 4f39347..e0ea384 100644 --- a/ohre/abcre/dis/CodeBlocks.py +++ b/ohre/abcre/dis/CodeBlocks.py @@ -3,12 +3,11 @@ from ohre.abcre.dis.CODE_LV import CODE_LV from ohre.abcre.dis.CodeBlock import CodeBlock -from ohre.abcre.dis.NAC import NAC -from ohre.abcre.dis.NACTYPE import NACTYPE +from ohre.abcre.dis.DebugBase import DebugBase from ohre.misc import Log, utils -class CodeBlocks(): # NAC block contained, build control flow graph inside a single CodeBlocks for one method +class CodeBlocks(DebugBase): # NAC block contained, build control flow graph inside a single CodeBlocks for one method def __init__(self, in_l: Union[List[List[str]], List[CodeBlock]], ir_lv=CODE_LV.NATIVE): assert len(in_l) >= 0 self.blocks: List[CodeBlock] = list() @@ -19,9 +18,6 @@ def __init__(self, in_l: Union[List[List[str]], List[CodeBlock]], ir_lv=CODE_LV. else: # maybe list(str) in list # anyway, try init CodeBlock using element(asm codea str list) in list self.blocks: List[CodeBlock] = [CodeBlock(in_l)] - def __str__(self): - return self.debug_short() - @property def len(self): return len(self.blocks) @@ -45,12 +41,12 @@ def set_level(self, level): def __len__(self) -> int: return len(self.blocks) - def debug_short(self) -> str: + def _debug_str(self) -> str: out = f"CodeBlocks: blocks({len(self.blocks)}) {self.level_str}" return out - def debug_deep(self) -> str: - out = f"{self.debug_short()}\n" + def _debug_vstr(self) -> str: + out = f"{self._debug_str()}\n" for i in range(len(self.blocks)): - out += f"[{i}/{len(self.blocks)}]-block: {self.blocks[i].debug_deep()}\n" + out += f"[{i}/{len(self.blocks)}]-block: {self.blocks[i]._debug_vstr()}\n" return out diff --git a/ohre/abcre/dis/ControlFlow.py b/ohre/abcre/dis/ControlFlow.py index 0d4336d..d6fca55 100644 --- a/ohre/abcre/dis/ControlFlow.py +++ b/ohre/abcre/dis/ControlFlow.py @@ -7,7 +7,7 @@ class ControlFlow(): def split_native_code_block(blocks: CodeBlocks) -> CodeBlocks: assert len(blocks) == 1 - nac_block = blocks.blocks[0] # should only have one NAC block, not TAC + nac_block = blocks.blocks[0] # should only have one NAC block, not TAC delimited_id: list = list() for i in range(len(nac_block)): nac = nac_block.insts[i] @@ -19,13 +19,6 @@ def split_native_code_block(blocks: CodeBlocks) -> CodeBlocks: delimited_id = sorted(list(set(delimited_id))) if (len(nac_block) not in delimited_id): delimited_id.append(len(nac_block)) - debug_out = "" - for idx in delimited_id: - if (idx < len(nac_block)): - debug_out += f"{idx}-{nac_block.insts[idx]}; " - else: - debug_out += f"{idx} nac_block len {len(nac_block)}" - Log.info(f"[ControlFlow] delimited id-nac {debug_out}", False) final_nac_blocks: list = list() idx_start = 0 diff --git a/ohre/abcre/dis/DebugBase.py b/ohre/abcre/dis/DebugBase.py new file mode 100644 index 0000000..deb5f57 --- /dev/null +++ b/ohre/abcre/dis/DebugBase.py @@ -0,0 +1,19 @@ +from abc import ABCMeta, abstractmethod + + +class DebugBase: + __metaclass__ = ABCMeta + + def __init__(self): + pass + + def __str__(self): + return self._debug_str() + + @abstractmethod + def _debug_str(self): + pass + + @abstractmethod + def _debug_vstr(self) -> str: + pass diff --git a/ohre/abcre/dis/DisFile.py b/ohre/abcre/dis/DisFile.py index f41fd63..cfcc18e 100644 --- a/ohre/abcre/dis/DisFile.py +++ b/ohre/abcre/dis/DisFile.py @@ -1,9 +1,11 @@ from typing import Any, Dict, Iterable, List, Tuple, Union +from ohre.abcre.dis.AsmLiteral import AsmLiteral from ohre.abcre.dis.AsmMethod import AsmMethod from ohre.abcre.dis.AsmRecord import AsmRecord from ohre.abcre.dis.AsmString import AsmString -from ohre.misc import Log +from ohre.abcre.dis.DebugBase import DebugBase +from ohre.misc import Log, utils class STATE: @@ -22,11 +24,12 @@ def _is_delimiter(s: str) -> bool: return False -class DisFile(): +class DisFile(DebugBase): def __init__(self, value): self.source_binary_name: str = "" self.language: str = "" self.lines: List[str] = list() + self.literals: List[AsmLiteral] = list() self.records: List[AsmRecord] = list() self.methods: List[AsmMethod] = list() self.asmstrs: List[AsmString] = list() @@ -91,12 +94,20 @@ def _read_disheader(self, l_n) -> Tuple[int, int]: Log.error(f"ERROR in _read_disheader, else hit. line {line}") l_n += 1 - def _read_literals(self, l_n) -> Tuple[int, int]: + def _read_literals(self, l_n: int) -> Tuple[int, int]: while (l_n < len(self.lines)): line: str = self.lines[l_n].strip() if (_is_delimiter(line)): return STATE.NEW_SEC, l_n + 1 - l_n += 1 + parts = line.split(" ") + if (parts[0].isdigit()): + l_idx, n_idx = utils.find_matching_symbols_multi_line(self.lines[l_n:], "{") + if (l_idx is not None): + asm_lit = AsmLiteral(self.lines[l_n:l_n + l_idx + 1]) + self.literals.append(asm_lit) + l_n += l_idx + 1 + else: + l_n += 1 return None, l_n + 1 def _read_records(self, l_n) -> Tuple[int, int]: @@ -157,20 +168,17 @@ def _read_strings(self, l_n) -> Tuple[int, int]: l_n += 1 return None, l_n + 1 - def __str__(self): - return self.debug_short() - - def debug_short(self) -> str: + def _debug_str(self) -> str: out = f"DisFile: {self.source_binary_name} language {self.language} lines({len(self.lines)}) \ -records({len(self.records)}) methods({len(self.methods)}) asmstrs({len(self.asmstrs)})" +literals({len(self.literals)}) records({len(self.records)}) methods({len(self.methods)}) asmstrs({len(self.asmstrs)})" return out - def debug_deep(self) -> str: - out = self.debug_short() + "\n" + def _debug_vstr(self) -> str: + out = self._debug_str() + "\n" for rec in self.records: - out += f">> {rec.debug_deep()}\n" + out += f">> {rec._debug_vstr()}\n" for method in self.methods: - out += f">> {method.debug_deep()}\n" + out += f">> {method._debug_vstr()}\n" for asmstr in self.asmstrs: out += f">> {asmstr}\n" return out diff --git a/ohre/abcre/dis/ISA.py b/ohre/abcre/dis/ISA.py index d267d1d..523a2e8 100644 --- a/ohre/abcre/dis/ISA.py +++ b/ohre/abcre/dis/ISA.py @@ -17,12 +17,12 @@ def __init__(self, isa_file_path: str): self.prefixes: Dict = None self.prefixes = self._get_prefixes_dict() assert self.prefixes is not None - Log.info(f"[ISA] self.prefixes {len(self.prefixes)} {self.prefixes}") + Log.info(f"[ISA] prefixes {len(self.prefixes)} {self.prefixes}") self.opstr2infod: Dict[str, Dict] | None = None self.opstr2infod = self._get_opstr_dict() assert self.opstr2infod is not None - Log.info(f"[ISA] self.opstr2infod {len(self.opstr2infod)} keys: {self.opstr2infod.keys()}") + Log.info(f"[ISA] opstr2infod len {len(self.opstr2infod)}") def _get_prefixes_dict(self) -> Dict: if (self.prefixes is not None): @@ -108,7 +108,6 @@ def get_opstr_info_dict(self, opstr: str) -> Union[Dict, None]: if __name__ == "__main__": ohre.set_log_print(True) isa = ISA(os.path.join(os.path.dirname(os.path.abspath(__file__)), "isa.yaml")) - # print(json.dumps(isa.ori_d["groups"], indent=4)) assert isa.get_opcodes("deprecated.getiteratornext") == [0xfc02] assert isa.get_opcodes("callruntime.notifyconcurrentresult") == [0xfb00] for ins_str in ["mov", "callruntime.definefieldbyindex", "isin", "jequndefined"]: diff --git a/ohre/abcre/dis/NAC.py b/ohre/abcre/dis/NAC.py index 02b73e4..9d9486b 100644 --- a/ohre/abcre/dis/NAC.py +++ b/ohre/abcre/dis/NAC.py @@ -1,9 +1,10 @@ from typing import Any, Dict, Iterable, List, Tuple, Union +from ohre.abcre.dis.DebugBase import DebugBase from ohre.abcre.dis.NACTYPE import NACTYPE -class NAC(): # N Address Code +class NAC(DebugBase): # N Address Code # Native representation of ark_disasm-ed ArkTS bytecode # corresponding to a single line in a panda function @@ -17,10 +18,7 @@ def __init__(self, op_args: List[str]): for i in range(1, len(op_args)): self.args.append(op_args[i]) - def __str__(self): - return self.debug_short() - - def debug_short(self): + def _debug_str(self): out = f"{self.op} " for i in range(len(self.args)): if (i == len(self.args) - 1): @@ -29,7 +27,7 @@ def debug_short(self): out += f"{self.args[i]}, " return out - def debug_deep(self): + def _debug_vstr(self): out = f"({NACTYPE.get_code_name(self.type)}) {self.op} " for i in range(len(self.args)): if (i == len(self.args) - 1): diff --git a/ohre/abcre/dis/NACTYPE.py b/ohre/abcre/dis/NACTYPE.py index bd1a164..27b0d11 100644 --- a/ohre/abcre/dis/NACTYPE.py +++ b/ohre/abcre/dis/NACTYPE.py @@ -51,7 +51,6 @@ def get_NAC_type(cls, op: str) -> int: return NACTYPE.LABEL info_d = cls.isa.get_opstr_info_dict(op) - # print(f"op {op} info_d {info_d}") assert info_d is not None and "title" in info_d.keys() if (_value_in_key_of_dict(info_d, "properties", "return")): return NACTYPE.RETURN diff --git a/ohre/abcre/dis/NativeToTAC.py b/ohre/abcre/dis/NACtoTAC.py similarity index 68% rename from ohre/abcre/dis/NativeToTAC.py rename to ohre/abcre/dis/NACtoTAC.py index 14c1888..aea8081 100644 --- a/ohre/abcre/dis/NativeToTAC.py +++ b/ohre/abcre/dis/NACtoTAC.py @@ -1,17 +1,23 @@ +import copy +from typing import Any, Dict, Iterable, List, Tuple, Union + from ohre.abcre.dis.AsmArg import AsmArg +from ohre.abcre.dis.AsmMethod import AsmMethod from ohre.abcre.dis.AsmTypes import AsmTypes from ohre.abcre.dis.CODE_LV import CODE_LV -from ohre.abcre.dis.CodeBlocks import CodeBlocks from ohre.abcre.dis.CodeBlock import CodeBlock +from ohre.abcre.dis.CodeBlocks import CodeBlocks +from ohre.abcre.dis.DisFile import DisFile from ohre.abcre.dis.NAC import NAC from ohre.abcre.dis.TAC import TAC from ohre.misc import Log, utils -class NativeToTAC: +class NACtoTAC: @classmethod - def toTAC(cls, nac: NAC) -> TAC: - print(f"toTAC: nac: {nac.debug_deep()}") # TODO: more tac builder plz + def toTAC(self, nac: NAC, ams_method: AsmMethod, dis_file: DisFile) -> Union[TAC, List[TAC]]: + print(f"nac_: {nac._debug_vstr()}") # TODO: more tac builder plz + if (nac.op == "mov"): return TAC.tac_assign(AsmArg.build_arg(nac.args[0]), AsmArg.build_arg(nac.args[1])) if (nac.op == "lda"): @@ -26,28 +32,25 @@ def toTAC(cls, nac: NAC) -> TAC: return TAC.tac_assign(AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.UNDEFINED)) if (nac.op == "sta"): return TAC.tac_assign(AsmArg.build_arg(nac.args[0]), AsmArg(AsmTypes.ACC)) - if (nac.op == "ldobjbyname"): - return TAC.tac_assign( - AsmArg(AsmTypes.ACC), - AsmArg(AsmTypes.STR, value=nac.args[1]), - log=f"arg0: {nac.args[0]} todo: check ldobjbyname") - if (nac.op == "isfalse"): - return TAC.tac_assign(AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.FALSE), rop="==") if (nac.op == "callruntime.isfalse"): pass - if (nac.op == "copyrestargs"): - return TAC.tac_unknown([AsmArg(AsmTypes.IMM, value=nac.args[0])], log="todo: copyrestargs imm:u8") if (nac.op == "lda.str"): pass - if (nac.op == "tryldglobalbyname"): + if (nac.op == "ldundefined"): pass - + # === inst: comparation instructions # START if (nac.op == "stricteq"): pass - if (nac.op == "ldundefined"): - pass + # === inst: comparation instructions # END + + # === inst: unary operations # START + if (nac.op == "isfalse"): + return TAC.tac_assign(AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.FALSE), rop="==") + if (nac.op == "istrue"): + return TAC.tac_assign(AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.ACC), AsmArg(AsmTypes.TRUE), rop="==") + # === inst: unary operations # END - # === inst about jump START + # === inst: jump operations # START if (nac.op == "jnez"): # TODO: jnez imm:i32 # a label str in *.dis file # support imm in future return TAC.tac_cond_jmp( AsmArg(AsmTypes.LABEL, nac.args[0]), @@ -62,36 +65,51 @@ def toTAC(cls, nac: NAC) -> TAC: "==") if (nac.op == "jmp"): return TAC.tac_uncn_jmp(AsmArg(AsmTypes.LABEL, nac.args[0]), log="todo: check label's existence") - # === inst about jump END + # === inst: jump operations # END - # === inst about call START + # === inst: call instructions # START if (nac.op == "callthis1"): pass if (nac.op == "callthisrange"): pass - # === inst about call END + # === inst: call instructions # END - # === inst about return START + # === inst: dynamic return # START if (nac.op == "returnundefined"): pass if (nac.op == "return"): pass - # === inst about return END + # === inst: dynamic return # END + + # === inst: object visitors # START + if (nac.op == "ldobjbyname"): + return TAC.tac_assign( + AsmArg(AsmTypes.ACC), + AsmArg(AsmTypes.STR, value=nac.args[1]), + log=f"arg0: {nac.args[0]} todo: check ldobjbyname") + if (nac.op == "ldexternalmodulevar"): + pass + if (nac.op == "tryldglobalbyname"): + pass + if (nac.op == "copyrestargs"): + return TAC.tac_unknown([AsmArg(AsmTypes.IMM, value=nac.args[0])], log="todo: copyrestargs imm:u8") + # === inst: object visitors # END - Log.warn(f"toTAC failed, not support nac inst: {nac.debug_deep()}", False) # to error when done + Log.warn(f"toTAC failed, not support nac inst: {nac._debug_vstr()}", False) # to error when done return TAC.tac_unknown( [AsmArg(AsmTypes.UNKNOWN, nac.args[i]) for i in range(len(nac.args))], log=f"todo: {nac.op}") @classmethod - def native_code_to_TAC(cls, blocks: CodeBlocks) -> CodeBlocks: - assert blocks.level == CODE_LV.NATIVE_BLOCK_SPLITED + def trans_NAC_to_TAC(cls, ams_method: AsmMethod, dis_file: DisFile) -> CodeBlocks: + cbs = ams_method.code_blocks + assert cbs.level == CODE_LV.NATIVE_BLOCK_SPLITED cbs_l = list() - for block in blocks.blocks: + for block in cbs.blocks: tac_inst_l = list() for nac_inst in block.insts: - tac_inst = NativeToTAC.toTAC(nac_inst) # TODO: may return a list of tac - print(f"toTAC: tac: {tac_inst.debug_deep()}") + tac_inst = NACtoTAC.toTAC(nac_inst, ams_method, dis_file) # TODO: may return a list of tac + print(f"tac^: {tac_inst._debug_vstr()}") tac_inst_l.append(tac_inst) cb = CodeBlock(tac_inst_l) cbs_l.append(cb) diff --git a/ohre/abcre/dis/PandaReverser.py b/ohre/abcre/dis/PandaReverser.py new file mode 100644 index 0000000..6425585 --- /dev/null +++ b/ohre/abcre/dis/PandaReverser.py @@ -0,0 +1,41 @@ +from typing import Any, Dict, Iterable, List, Tuple, Union + +from ohre.abcre.dis.AsmMethod import AsmMethod +from ohre.abcre.dis.AsmRecord import AsmRecord +from ohre.abcre.dis.AsmString import AsmString +from ohre.abcre.dis.AsmTypes import AsmTypes +from ohre.abcre.dis.CodeBlocks import CodeBlocks +from ohre.abcre.dis.DebugBase import DebugBase +from ohre.abcre.dis.DisFile import DisFile +from ohre.abcre.dis.NACtoTAC import NACtoTAC +from ohre.misc import Log, utils + + +class PandaReverser(DebugBase): + # interface class for user + def __init__(self, dis_file: DisFile): + self.dis_file: DisFile = dis_file + + def split_native_code_block(self, method_id: int = -1, method_name: str = None): + if (isinstance(method_id, int) and method_id >= 0 and method_id < len(self.dis_file.methods)): + self.dis_file.methods[method_id].split_native_code_block() + elif (method_name is not None and len(method_name)): + pass + else: + pass + + def trans_NAC_to_TAC(self, method_id: int = -1, method_name: str = None): + if (isinstance(method_id, int) and method_id >= 0 and method_id < len(self.dis_file.methods)): + cbs = NACtoTAC.trans_NAC_to_TAC(self.dis_file.methods[method_id], self.dis_file) + elif (method_name is not None and len(method_name)): + pass + else: + pass + + def _debug_str(self) -> str: + out = f"PandaReverser: {self.dis_file}" + return out + + def _debug_vstr(self) -> str: + out = f"{self._debug_str()}\n" + return out diff --git a/ohre/abcre/dis/TAC.py b/ohre/abcre/dis/TAC.py index 112f6f4..64955f5 100644 --- a/ohre/abcre/dis/TAC.py +++ b/ohre/abcre/dis/TAC.py @@ -35,19 +35,19 @@ def tac_unknown(cls, paras: List[AsmArg] = None, log: str = ""): return TAC(TACTYPE.UNKNOWN, paras, log=log) def __str__(self): - return self.debug_short() + return self._debug_str() - def debug_short(self): + def _debug_str(self): out = f"[{TACTYPE.get_code_name(self.optype)}]\t" for i in range(len(self.args)): - out += f"{self.args[i].debug_short()}, " + out += f"{self.args[i]._debug_str()}, " return out - def debug_deep(self): + def _debug_vstr(self): out = f"[{TACTYPE.get_code_name(self.optype)}]\t" for i in range(len(self.args)): - out += f"{self.args[i].debug_deep()} " + out += f"{self.args[i]._debug_vstr()} " if (i == 1 and self.rop is not None and len(self.rop) > 0): out += f"({self.rop}) " if (self.log is not None and len(self.log) > 0): diff --git a/ohre/misc/Log.py b/ohre/misc/Log.py index 875f06c..653ce7c 100644 --- a/ohre/misc/Log.py +++ b/ohre/misc/Log.py @@ -6,7 +6,7 @@ g_log = None DEBUG_LOCAL = True -DEBUG_LEN = 500 +DEBUG_LEN = 300 def debug_print(logstr: str, level: str = "debug"): diff --git a/ohre/misc/utils.py b/ohre/misc/utils.py index 389cb61..e3574c0 100644 --- a/ohre/misc/utils.py +++ b/ohre/misc/utils.py @@ -1,6 +1,6 @@ -from typing import Any, Dict, Iterable, List, Tuple +from typing import Any, Dict, Iterable, List, Tuple, Union import yaml - +import copy def is_uppercase_or_underscore(s: str): return all(c.isupper() or c.isdigit() or c == "_" for c in s) @@ -30,9 +30,10 @@ def is_left(pair_left_char_l, c): return False -def find_next_delimiter(line: str, start_idx: int = 0, delimiter: str = ",", - pair_left_char_l: List = ["\"", "(", "[", "{"], - pair_right_char_l: List = ["\"", ")", "]", "}"]): +def find_next_delimiter_single_line(line: str, start_idx: int = 0, delimiter: str = ",", + pair_left_char_l: List = ["\"", "(", "[", "{"], + pair_right_char_l: List = ["\"", ")", "]", "}"]): + # e.g. to get coressponding idx of '}' in such single line: {("[1(abc)*]11")} stack_l = list() for idx in range(start_idx, len(line)): if (is_right_and_match_stack_top(stack_l, pair_left_char_l, pair_right_char_l, line[idx])): @@ -43,6 +44,26 @@ def find_next_delimiter(line: str, start_idx: int = 0, delimiter: str = ",", return idx return len(line) +def find_matching_symbols_multi_line(lines: List[str], start_char: str, + pair_left_char_l: List = ["\"", "(", "[", "{"], + pair_right_char_l: List = ["\"", ")", "]", "}"]) -> Tuple[int, int]: + # find the corressponding right char of `start_char`, return the line idx and idx in that line + # attention: start_char should in pair_left_char_l + stack_l = list() + assert isinstance(start_char, str) and len(start_char) == 1 + start_char_hit = False + for l_idx in range(len(lines)): + for n_idx in range(len(lines[l_idx])): + if (is_right_and_match_stack_top(stack_l, pair_left_char_l, pair_right_char_l, lines[l_idx][n_idx])): + stack_l.pop() + if (start_char_hit and len(stack_l) == 0): + return l_idx, n_idx + elif (lines[l_idx][n_idx] == start_char): + stack_l.append(lines[l_idx][n_idx]) + start_char_hit = True + elif (is_left(pair_left_char_l, lines[l_idx][n_idx])): + stack_l.append(lines[l_idx][n_idx]) + return None, None def read_dict_from_yaml_file(f_name: str) -> dict: ret = None @@ -53,25 +74,44 @@ def read_dict_from_yaml_file(f_name: str) -> dict: print(f"read yaml failed, e:{e}") return ret +def find_single_kv(s: str, delimiter: str = ":") -> Tuple[Union[str, None], Union[str, None]]: + # "1 : @ohos:hilog" to ("1", "@ohos:hilog") # only match the first delimiter + s = s.strip() + idx = s.find(delimiter) + if (idx > 0): + key = s[:idx].strip() + value = s[idx + len(delimiter):].strip() + return key, value + else: + return None, None def hexstr(value) -> str: ret = "" if isinstance(value, Iterable): for i in range(len(value)): if (i != len(value) - 1): - ret += f"{hex(value[i])}," + ret += f"{hexstr(value[i])}," else: - ret += f"{hex(value[i])}" + ret += f"{hexstr(value[i])}" elif (isinstance(value, int)): ret = f"{hex(value)}" else: - ret = f"unsupported_value_type! value:{value}" + ret = f"{value}" return ret if __name__ == "__main__": temp = """newlexenvwithname 0x2, { 5 [ i32:2, string:"4newTarget", i32:0, string:"this", i32:1, ]}""" - idx = find_next_delimiter(temp, 17) + idx = find_next_delimiter_single_line(temp, 17) print(f"idx {idx} {temp[17: idx]}") - idx = find_next_delimiter(temp, 22) + idx = find_next_delimiter_single_line(temp, 22) print(f"idx {idx} {temp[22: idx]}") + + temp = [ + "12 0x15f5 { 3 [", "MODULE_REQUEST_ARRAY: {", " 0 : @ohos:app.ability.UIAbility,", " 1 : @ohos:hilog,", + "};", + "ModuleTag: REGULAR_IMPORT, local_name: UIAbility, import_name: default, module_request: @ohos:app.ability.UIAbility;", + "ModuleTag: REGULAR_IMPORT, local_name: hilog, import_name: default, module_request: @ohos:hilog;", + "ModuleTag: LOCAL_EXPORT, local_name: EntryAbility, export_name: default;", "]}"] + l_idx, n_idx = find_matching_symbols_multi_line(temp, "{") + print(f"l_idx {l_idx} n_idx {n_idx}")