diff --git a/Ghidra/Debug/Debugger-agent-dbgeng/src/main/py/src/ghidradbg/arch.py b/Ghidra/Debug/Debugger-agent-dbgeng/src/main/py/src/ghidradbg/arch.py index 81afd993c0..08670799f3 100644 --- a/Ghidra/Debug/Debugger-agent-dbgeng/src/main/py/src/ghidradbg/arch.py +++ b/Ghidra/Debug/Debugger-agent-dbgeng/src/main/py/src/ghidradbg/arch.py @@ -291,7 +291,7 @@ register_mappers = { } -def compute_register_mapper(lang): +def compute_register_mapper(lang: str)-> DefaultRegisterMapper: if not lang in register_mappers: if ':BE:' in lang: return DEFAULT_BE_REGISTER_MAPPER diff --git a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/arch.py b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/arch.py index bb4e9e6ee0..537750d82c 100644 --- a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/arch.py +++ b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/arch.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. ## +from typing import Dict, List, Literal, Optional, Tuple + from ghidratrace.client import Address, RegVal import drgn @@ -20,7 +22,7 @@ from . import util # NOTE: This map is derived from the ldefs using a script -language_map = { +language_map: Dict[str, List[str]] = { 'AARCH64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'], 'ARM': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'], 'PPC64': ['PowerPC:BE:64:4xx', 'PowerPC:LE:64:4xx'], @@ -31,19 +33,19 @@ language_map = { 'UNKNOWN': ['DATA:LE:64:default', 'DATA:LE:64:default'], } -data64_compiler_map = { +data64_compiler_map: Dict[Optional[str], str] = { None: 'pointer64', } -default_compiler_map = { +default_compiler_map: Dict[Optional[str], str] = { 'Language.C': 'default', } -x86_compiler_map = { +x86_compiler_map: Dict[Optional[str], str] = { 'Language.C': 'gcc', } -compiler_map = { +compiler_map: Dict[str, Dict[Optional[str], str]] = { 'DATA:BE:64:': data64_compiler_map, 'DATA:LE:64:': data64_compiler_map, 'x86:LE:32:': x86_compiler_map, @@ -56,12 +58,12 @@ compiler_map = { } -def get_arch(): +def get_arch() -> str: platform = drgn.host_platform return platform.arch.name -def get_endian(): +def get_endian() -> Literal['little', 'big']: parm = util.get_convenience_variable('endian') if parm != 'auto': return parm @@ -73,7 +75,7 @@ def get_endian(): return 'big' -def get_size(): +def get_size() -> str: parm = util.get_convenience_variable('size') if parm != 'auto': return parm @@ -85,11 +87,11 @@ def get_size(): return '32' -def get_osabi(): +def get_osabi() -> str: return "Language.C" -def compute_ghidra_language(): +def compute_ghidra_language() -> str: # First, check if the parameter is set lang = util.get_convenience_variable('ghidra-language') if lang != 'auto': @@ -103,7 +105,7 @@ def compute_ghidra_language(): sz = get_size() lebe = ':BE:' if endian == 'big' else ':LE:' if not arch in language_map: - return 'DATA' + lebe + sz +':default' + return 'DATA' + lebe + sz + ':default' langs = language_map[arch] matched_endian = sorted( (l for l in langs if lebe in l), @@ -115,7 +117,7 @@ def compute_ghidra_language(): return 'DATA' + lebe + sz + ':default' -def compute_ghidra_compiler(lang): +def compute_ghidra_compiler(lang: str) -> str: # First, check if the parameter is set comp = util.get_convenience_variable('ghidra-compiler') if comp != 'auto': @@ -124,12 +126,12 @@ def compute_ghidra_compiler(lang): # Check if the selected lang has specific compiler recommendations matched_lang = sorted( (l for l in compiler_map if l in lang), -# key=lambda l: compiler_map[l] + # key=lambda l: compiler_map[l] ) if len(matched_lang) == 0: print(f"{lang} not found in compiler map - using default compiler") return 'default' - + comp_map = compiler_map[matched_lang[0]] if comp_map == data64_compiler_map: print(f"Using the DATA64 compiler map") @@ -145,7 +147,7 @@ def compute_ghidra_compiler(lang): return 'default' -def compute_ghidra_lcsp(): +def compute_ghidra_lcsp() -> Tuple[str, str]: lang = compute_ghidra_language() comp = compute_ghidra_compiler(lang) return lang, comp @@ -153,14 +155,14 @@ def compute_ghidra_lcsp(): class DefaultMemoryMapper(object): - def __init__(self, defaultSpace): + def __init__(self, defaultSpace: str) -> None: self.defaultSpace = defaultSpace - def map(self, proc: drgn.Program, offset: int): + def map(self, proc: int, offset: int) -> Tuple[str, Address]: space = self.defaultSpace return self.defaultSpace, Address(space, offset) - def map_back(self, proc: drgn.Program, address: Address) -> int: + def map_back(self, proc: int, address: Address) -> int: if address.space == self.defaultSpace: return address.offset raise ValueError( @@ -169,10 +171,10 @@ class DefaultMemoryMapper(object): DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram') -memory_mappers = {} +memory_mappers: Dict[str, DefaultMemoryMapper] = {} -def compute_memory_mapper(lang): +def compute_memory_mapper(lang: str) -> DefaultMemoryMapper: if not lang in memory_mappers: return DEFAULT_MEMORY_MAPPER return memory_mappers[lang] @@ -180,29 +182,29 @@ def compute_memory_mapper(lang): class DefaultRegisterMapper(object): - def __init__(self, byte_order): + def __init__(self, byte_order: str) -> None: if not byte_order in ['big', 'little']: raise ValueError("Invalid byte_order: {}".format(byte_order)) self.byte_order = byte_order - self.union_winners = {} - def map_name(self, proc, name): + def map_name(self, proc: int, name: str) -> str: return name - def map_value(self, proc, name, value): + def map_value(self, proc: int, name: str, value: bytes): return RegVal(self.map_name(proc, name), value) - def map_name_back(self, proc, name): + def map_name_back(self, proc: int, name: str): return name - def map_value_back(self, proc, name, value): + def map_value_back(self, proc: int, name: str, value: bytes): return RegVal(self.map_name_back(proc, name), value) DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big') DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little') -def compute_register_mapper(lang): + +def compute_register_mapper(lang: str) -> DefaultRegisterMapper: if ':BE:' in lang: return DEFAULT_BE_REGISTER_MAPPER else: diff --git a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/commands.py b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/commands.py index 22191805f3..2b61ad3d80 100644 --- a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/commands.py +++ b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/commands.py @@ -14,6 +14,7 @@ # limitations under the License. ## import code +from concurrent.futures import Future from contextlib import contextmanager import inspect import os.path @@ -21,13 +22,15 @@ import re import socket import sys import time -from typing import Union +from typing import Any, Dict, Generator, Iterable, List, Optional, Sequence, Tuple, Union import drgn import drgn.cli +from drgn import Program from ghidratrace import sch -from ghidratrace.client import ( - Client, Address, AddressRange, TraceObject, Schedule) +from ghidratrace.client import (Client, Address, AddressRange, Lifespan, RegVal, + Schedule, Trace, TraceObject, TraceObjectValue, + Transaction) from ghidratrace.display import print_tabular_values, wait from . import util, arch, methods, hooks @@ -64,6 +67,7 @@ SYMBOL_KEY_PATTERN = '[{sid}]' SYMBOL_PATTERN = SYMBOLS_PATTERN + SYMBOL_KEY_PATTERN PROGRAMS = {} +prog: Program class ErrorWithCode(Exception): @@ -75,6 +79,22 @@ class ErrorWithCode(Exception): return repr(self.code) +class Extra(object): + def __init__(self) -> None: + self.memory_mapper: Optional[arch.DefaultMemoryMapper] = None + self.register_mapper: Optional[arch.DefaultRegisterMapper] = None + + def require_mm(self) -> arch.DefaultMemoryMapper: + if self.memory_mapper is None: + raise RuntimeError("No memory mapper") + return self.memory_mapper + + def require_rm(self) -> arch.DefaultRegisterMapper: + if self.register_mapper is None: + raise RuntimeError("No register mapper") + return self.register_mapper + + class State(object): def __init__(self) -> None: @@ -85,45 +105,46 @@ class State(object): raise RuntimeError("Not connected") return self.client - def require_no_client(self): + def require_no_client(self) -> None: if self.client != None: raise RuntimeError("Already connected") - def reset_client(self): - self.client = None + def reset_client(self) -> None: + self.client: Optional[Client] = None self.reset_trace() - def require_trace(self): + def require_trace(self) -> Trace[Extra]: if self.trace is None: raise RuntimeError("No trace active") return self.trace - def require_no_trace(self): + def require_no_trace(self) -> None: if self.trace != None: raise RuntimeError("Trace already started") - def reset_trace(self): - self.trace = None + def reset_trace(self) -> None: + self.trace: Optional[Trace[Extra]] = None util.set_convenience_variable('_ghidra_tracing', "false") self.reset_tx() - def require_tx(self): + def require_tx(self) -> Tuple[Trace, Transaction]: + trace = self.require_trace() if self.tx is None: raise RuntimeError("No transaction") - return self.tx + return trace, self.tx - def require_no_tx(self): + def require_no_tx(self) -> None: if self.tx != None: raise RuntimeError("Transaction already started") - def reset_tx(self): - self.tx = None + def reset_tx(self) -> None: + self.tx: Optional[Transaction] = None STATE = State() -def ghidra_trace_connect(address=None): +def ghidra_trace_connect(address: Optional[str] = None) -> None: """ Connect Python to Ghidra for tracing @@ -149,7 +170,7 @@ def ghidra_trace_connect(address=None): raise RuntimeError("port must be numeric") -def ghidra_trace_listen(address='0.0.0.0:0'): +def ghidra_trace_listen(address: str = '0.0.0.0:0') -> None: """ Listen for Ghidra to connect for tracing @@ -183,24 +204,27 @@ def ghidra_trace_listen(address='0.0.0.0:0'): raise RuntimeError("port must be numeric") -def ghidra_trace_disconnect(): +def ghidra_trace_disconnect() -> None: """Disconnect Python from Ghidra for tracing""" STATE.require_client().close() STATE.reset_client() -def start_trace(name): +def start_trace(name: str) -> None: language, compiler = arch.compute_ghidra_lcsp() if name is None: name = 'drgn/noname' - STATE.trace = STATE.client.create_trace( - name, language, compiler, extra=None) + STATE.trace = STATE.require_client().create_trace( + name, language, compiler, extra=Extra()) # TODO: Is adding an attribute like this recommended in Python? - STATE.trace.memory_mapper = arch.compute_memory_mapper(language) - STATE.trace.register_mapper = arch.compute_register_mapper(language) + STATE.trace.extra.memory_mapper = arch.compute_memory_mapper(language) + STATE.trace.extra.register_mapper = arch.compute_register_mapper(language) - parent = os.path.dirname(inspect.getfile(inspect.currentframe())) + frame = inspect.currentframe() + if frame is None: + raise AssertionError("cannot locate schema.xml") + parent = os.path.dirname(inspect.getfile(frame)) schema_fn = os.path.join(parent, 'schema.xml') with open(schema_fn, 'r') as schema_file: schema_xml = schema_file.read() @@ -210,7 +234,7 @@ def start_trace(name): util.set_convenience_variable('_ghidra_tracing', "true") -def ghidra_trace_start(name=None): +def ghidra_trace_start(name: str) -> None: """Start a Trace in Ghidra""" STATE.require_client() @@ -218,14 +242,14 @@ def ghidra_trace_start(name=None): start_trace(name) -def ghidra_trace_stop(): +def ghidra_trace_stop() -> None: """Stop the Trace in Ghidra""" STATE.require_trace().close() STATE.reset_trace() -def ghidra_trace_restart(name=None): +def ghidra_trace_restart(name: str) -> None: """Restart or start the Trace in Ghidra""" STATE.require_client() @@ -235,25 +259,27 @@ def ghidra_trace_restart(name=None): start_trace(name) -def ghidra_trace_create(start_trace=True): +def ghidra_trace_create(start_trace: bool = True) -> None: """ Create a session. """ global prog - prog = drgn.Program() + prog = Program() kind = os.getenv('OPT_TARGET_KIND') if kind == "kernel": prog.set_kernel() elif kind == "coredump": img = os.getenv('OPT_TARGET_IMG') - prog.set_core_dump(img) - if '/' in img: - img = img[img.rindex('/')+1:] + if img is not None: + prog.set_core_dump(img) + if '/' in img: + img = img[img.rindex('/')+1:] else: - pid = int(os.getenv('OPT_TARGET_PID')) - prog.set_pid(pid) - util.selected_pid = pid + pid = os.getenv('OPT_TARGET_PID') + if pid is not None: + prog.set_pid(int(pid)) + util.selected_pid = int(pid) default_symbols = {"default": True, "main": True} try: @@ -262,21 +288,21 @@ def ghidra_trace_create(start_trace=True): print(e) if kind == "kernel": - img = prog.main_module().name + img = prog.main_module().name # type: ignore util.selected_tid = next(prog.threads()).tid elif kind == "coredump": util.selected_tid = prog.crashed_thread().tid else: - img = prog.main_module().name + img = prog.main_module().name # type: ignore util.selected_tid = prog.main_thread().tid - if start_trace: + if start_trace and img is not None: ghidra_trace_start(img) PROGRAMS[util.selected_pid] = prog -def ghidra_trace_info(): +def ghidra_trace_info() -> None: """Get info about the Ghidra connection""" if STATE.client is None: @@ -290,7 +316,7 @@ def ghidra_trace_info(): print("Trace active") -def ghidra_trace_info_lcsp(): +def ghidra_trace_info_lcsp() -> None: """ Get the selected Ghidra language-compiler-spec pair. """ @@ -300,7 +326,7 @@ def ghidra_trace_info_lcsp(): print("Selected Ghidra compiler: {}".format(compiler)) -def ghidra_trace_txstart(description="tx"): +def ghidra_trace_txstart(description: str = "tx") -> None: """ Start a transaction on the trace """ @@ -309,37 +335,37 @@ def ghidra_trace_txstart(description="tx"): STATE.tx = STATE.require_trace().start_tx(description, undoable=False) -def ghidra_trace_txcommit(): +def ghidra_trace_txcommit() -> None: """ Commit the current transaction """ - STATE.require_tx().commit() + STATE.require_tx()[1].commit() STATE.reset_tx() -def ghidra_trace_txabort(): +def ghidra_trace_txabort() -> None: """ Abort the current transaction Use only in emergencies. """ - tx = STATE.require_tx() + trace, tx = STATE.require_tx() print("Aborting trace transaction!") tx.abort() STATE.reset_tx() @contextmanager -def open_tracked_tx(description): +def open_tracked_tx(description: str) -> Generator[Transaction, None, None]: with STATE.require_trace().open_tx(description) as tx: STATE.tx = tx yield tx STATE.reset_tx() -def ghidra_trace_save(): +def ghidra_trace_save() -> None: """ Save the current trace """ @@ -347,7 +373,8 @@ def ghidra_trace_save(): STATE.require_trace().save() -def ghidra_trace_new_snap(description=None, time: Union[int, Schedule, None] = None): +def ghidra_trace_new_snap(description: Optional[str] = None, + time: Optional[Schedule] = None) -> Dict[str, int]: """ Create a new snapshot @@ -357,15 +384,16 @@ def ghidra_trace_new_snap(description=None, time: Union[int, Schedule, None] = N description = str(description) if isinstance(time, int): time = Schedule(time) - STATE.require_tx() - return {'snap': STATE.require_trace().snapshot(description, time=time)} + trace, tx = STATE.require_tx() + return {'snap': trace.snapshot(description, time=time)} -def quantize_pages(start, end): +def quantize_pages(start: int, end: int) -> Tuple[int, int]: return (start // PAGE_SIZE * PAGE_SIZE, (end + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE) -def put_bytes(start, end, pages, display_result): +def put_bytes(start: int, end: int, pages: bool, + display_result: bool = False) -> Dict[str, int]: trace = STATE.require_trace() if pages: start, end = quantize_pages(start, end) @@ -377,44 +405,38 @@ def put_bytes(start, end, pages, display_result): except Exception as e: return {'count': 0} - count = 0 + count: Union[int, Future[int]] = 0 if buf != None: - base, addr = trace.memory_mapper.map(nproc, start) + base, addr = trace.extra.require_mm().map(nproc, start) if base != addr.space: trace.create_overlay_space(base, addr.space) count = trace.put_bytes(addr, buf) if display_result: print("Wrote {} bytes".format(count)) - return {'count': count} + if isinstance(count, Future): + return {'count': -1} + else: + return {'count': count} + return {'count': 0} -def eval_address(address): +def eval_range(address: Union[str, int], length: Union[str, int]) -> Tuple[int, int]: + start = int(address) try: - nproc = util.selected_process() - trace = STATE.require_trace() - base, addr = trace.memory_mapper.map(nproc, address) - if base != addr.space: - trace.create_overlay_space(base, addr.space) - return addr - except Exception: - raise RuntimeError("Cannot convert '{}' to address".format(address)) - - -def eval_range(address, length): - start = address - try: - end = start + length + end = start + int(length) except Exception as e: raise RuntimeError("Cannot convert '{}' to length".format(length)) return start, end -def putmem(address, length, pages=True, display_result=True): +def putmem(address: Union[str, int], length: Union[str, int], + pages: bool = True, display_result: bool = True) -> Dict[str, int]: start, end = eval_range(address, length) return put_bytes(start, end, pages, display_result) -def ghidra_trace_putmem(address, length, pages=True): +def ghidra_trace_putmem(address: Union[str, int], length: Union[str, int], + pages: bool = True) -> Dict[str, int]: """ Record the given block of memory into the Ghidra trace. """ @@ -423,19 +445,22 @@ def ghidra_trace_putmem(address, length, pages=True): return putmem(address, length, pages, True) -def putmem_state(address, length, state, pages=True): - STATE.trace.validate_state(state) +def putmem_state(address: Union[str, int], length: Union[str, int], state: str, + pages: bool = True) -> None: + trace = STATE.require_trace() + trace.validate_state(state) start, end = eval_range(address, length) if pages: start, end = quantize_pages(start, end) nproc = util.selected_process() - base, addr = STATE.trace.memory_mapper.map(nproc, start) + base, addr = trace.extra.require_mm().map(nproc, start) if base != addr.space and state != 'unknown': - STATE.trace.create_overlay_space(base, addr.space) - STATE.trace.set_memory_state(addr.extend(end - start), state) + trace.create_overlay_space(base, addr.space) + trace.set_memory_state(addr.extend(end - start), state) -def ghidra_trace_putmem_state(address, length, state, pages=True): +def ghidra_trace_putmem_state(address: Union[str, int], length: Union[str, int], + state: str, pages: bool = True) -> None: """ Set the state of the given range of memory in the Ghidra trace. """ @@ -444,7 +469,8 @@ def ghidra_trace_putmem_state(address, length, state, pages=True): return putmem_state(address, length, state, pages) -def ghidra_trace_delmem(address, length): +def ghidra_trace_delmem(address: Union[str, int], + length: Union[str, int]) -> None: """ Delete the given range of memory from the Ghidra trace. @@ -453,39 +479,39 @@ def ghidra_trace_delmem(address, length): not quantize. You must do that yourself, if necessary. """ - STATE.require_tx() + trace, tx = STATE.require_tx() start, end = eval_range(address, length) nproc = util.selected_process() - base, addr = STATE.trace.memory_mapper.map(nproc, start) + base, addr = trace.extra.memory_mapper.map(nproc, start) # Do not create the space. We're deleting stuff. - STATE.trace.delete_bytes(addr.extend(end - start)) + trace.delete_bytes(addr.extend(end - start)) -def putreg(): +def putreg() -> Dict[str, List[str]]: + trace = STATE.require_trace() nproc = util.selected_process() if nproc < 0: - return + return {} nthrd = util.selected_thread() if nthrd < 0: - return + return {} nframe = util.selected_frame() if nframe < 0: - return + return {} space = REGS_PATTERN.format(procnum=nproc, tnum=nthrd, level=nframe) - STATE.trace.create_overlay_space('register', space) - robj = STATE.trace.create_object(space) + trace.create_overlay_space('register', space) + robj = trace.create_object(space) robj.insert() - mapper = STATE.trace.register_mapper + mapper = trace.extra.require_rm() thread = prog.thread(nthrd) try: frames = thread.stack_trace() except Exception as e: print(e) - return + return {} regs = frames[nframe].registers() - endian = arch.get_endian() sz = int(int(arch.get_size())/8) values = [] for key in regs.keys(): @@ -494,15 +520,18 @@ def putreg(): except Exception: value = 0 try: - rv = value.to_bytes(sz, endian) + rv = value.to_bytes(sz, "big") # trace takes "big" values.append(mapper.map_value(nproc, key, rv)) robj.set_value(key, hex(value)) except Exception: pass - return {'missing': STATE.trace.put_registers(space, values)} + missing = trace.put_registers(space, values) + if isinstance(missing, Future): + return {'future': []} + return {'missing': missing} -def ghidra_trace_putreg(): +def ghidra_trace_putreg() -> None: """ Record the given register group for the current frame into the Ghidra trace. @@ -513,14 +542,14 @@ def ghidra_trace_putreg(): putreg() -def ghidra_trace_delreg(): +def ghidra_trace_delreg() -> None: """ Delete the given register group for the curent frame from the Ghidra trace. Why would you do this? If no group is specified, 'all' is assumed. """ - STATE.require_tx() + trace, tx = STATE.require_tx() nproc = util.selected_process() nthrd = util.selected_thread() if nthrd < 0: @@ -541,12 +570,13 @@ def ghidra_trace_delreg(): names = [] for key in regs.keys(): names.append(key) - STATE.trace.delete_registers(space, names) + trace.delete_registers(space, names) -def put_object(lpath, key, value): +def put_object(lpath: str, key: str, value: Any): + trace = STATE.require_trace() nproc = util.selected_process() - lobj = STATE.trace.create_object(lpath+"."+key) + lobj = trace.create_object(lpath+"."+key) lobj.insert() if hasattr(value, "type_"): vtype = value.type_ @@ -567,13 +597,13 @@ def put_object(lpath, key, value): if hasattr(value, "address_"): vaddr = value.address_ if vaddr is not None: - base, addr = STATE.trace.memory_mapper.map(nproc, vaddr) + base, addr = trace.extra.require_mm().map(nproc, vaddr) lobj.set_value('Address', addr) if hasattr(value, "value_"): vvalue = value.value_() if vkind is drgn.TypeKind.POINTER: - base, addr = STATE.trace.memory_mapper.map(nproc, vvalue) + base, addr = trace.extra.require_mm().map(nproc, vvalue) lobj.set_value('Address', addr) return if vkind is drgn.TypeKind.TYPEDEF: @@ -583,7 +613,8 @@ def put_object(lpath, key, value): return if vkind is drgn.TypeKind.UNION or vkind is drgn.TypeKind.STRUCT or vkind is drgn.TypeKind.CLASS: for k in vvalue.keys(): - put_object(lobj.path+'.Members', k, vvalue[k]) + if isinstance(lobj.path, str): + put_object(lobj.path+'.Members', k, vvalue[k]) return lobj.set_value('_display', '{} [{}:{}]'.format( @@ -591,13 +622,15 @@ def put_object(lpath, key, value): lobj.set_value('Value', str(vvalue)) -def put_objects(pobj, parent): - ppath = pobj.path + '.Members' - for k in parent.keys(): - put_object(ppath, k, parent[k]) +def put_objects(pobj: TraceObject, parent: Dict[str, TraceObject]) -> None: + if isinstance(pobj.path, str): + ppath = pobj.path + '.Members' + for k in parent.keys(): + put_object(ppath, k, parent[k]) -def put_locals(): +def put_locals() -> None: + trace = STATE.require_trace() nproc = util.selected_process() if nproc < 0: return @@ -608,7 +641,7 @@ def put_locals(): if nframe < 0: return lpath = LOCALS_PATTERN.format(procnum=nproc, tnum=nthrd, level=nframe) - lobj = STATE.trace.create_object(lpath) + lobj = trace.create_object(lpath) lobj.insert() thread = prog.thread(nthrd) @@ -620,7 +653,7 @@ def put_locals(): put_object(lpath, key, value) -def ghidra_trace_put_locals(): +def ghidra_trace_put_locals() -> None: """ Record the local vars for the current frame into the Ghidra trace. """ @@ -629,7 +662,7 @@ def ghidra_trace_put_locals(): put_locals() -def ghidra_trace_create_obj(path=None): +def ghidra_trace_create_obj(path: str): """ Create an object in the Ghidra trace. @@ -638,25 +671,25 @@ def ghidra_trace_create_obj(path=None): object, after all its required attributes are set. """ - STATE.require_tx() - obj = STATE.trace.create_object(path) + trace, tx = STATE.require_tx() + obj = trace.create_object(path) obj.insert() print("Created object: id={}, path='{}'".format(obj.id, obj.path)) -def ghidra_trace_insert_obj(path): +def ghidra_trace_insert_obj(path: str) -> None: """ Insert an object into the Ghidra trace. """ # NOTE: id parameter is probably not necessary, since this command is for # humans. - STATE.require_tx() - span = STATE.trace.proxy_object_path(path).insert() + trace, tx = STATE.require_tx() + span = trace.proxy_object_path(path).insert() print("Inserted object: lifespan={}".format(span)) -def ghidra_trace_remove_obj(path): +def ghidra_trace_remove_obj(path: str) -> None: """ Remove an object from the Ghidra trace. @@ -664,44 +697,47 @@ def ghidra_trace_remove_obj(path): current snap and onwards. """ - STATE.require_tx() - STATE.trace.proxy_object_path(path).remove() + trace, tx = STATE.require_tx() + trace.proxy_object_path(path).remove() -def to_bytes(value): +def to_bytes(value: Sequence) -> bytes: return bytes(ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value))) -def to_string(value, encoding): +def to_string(value: Sequence, encoding: str) -> str: b = bytes(ord(value[i]) if type(value[i]) == str else int( value[i]) for i in range(0, len(value))) return str(b, encoding) -def to_bool_list(value): +def to_bool_list(value: Sequence) -> List[bool]: return [bool(value[i]) for i in range(0, len(value))] -def to_int_list(value): +def to_int_list(value: Sequence) -> List[int]: return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value))] -def to_short_list(value): +def to_short_list(value: Sequence) -> List[int]: return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value))] -def to_string_list(value, encoding): +def to_string_list(value: Sequence, encoding: str) -> List[str]: return [to_string(value[i], encoding) for i in range(0, len(value))] -def eval_value(value, schema=None): +def eval_value(value: Any, schema: Optional[sch.Schema] = None) -> Tuple[Union[ + bool, int, float, bytes, Tuple[str, Address], List[bool], List[int], + List[str], str], Optional[sch.Schema]]: if schema == sch.CHAR: return bytes(value, 'utf-8')[0], schema if schema == sch.BYTE or schema == sch.SHORT or schema == sch.INT or schema == sch.LONG: return int(value, 0), schema if schema == sch.ADDRESS: nproc = util.selected_process() - base, addr = STATE.trace.memory_mapper.map(nproc, value) + trace = STATE.require_trace() + base, addr = trace.extra.require_mm().map(nproc, value) return (base, addr), sch.ADDRESS if type(value) != str: value = eval("{}".format(value)) @@ -725,7 +761,8 @@ def eval_value(value, schema=None): return value, schema -def ghidra_trace_set_value(path: str, key: str, value, schema=None): +def ghidra_trace_set_value(path: str, key: str, value: Any, + schema: Optional[str] = None) -> None: """ Set a value (attribute or element) in the Ghidra trace's object tree. @@ -734,21 +771,23 @@ def ghidra_trace_set_value(path: str, key: str, value, schema=None): language. which current defaults to DEBUG_EXPR_CPLUSPLUS (vs DEBUG_EXPR_MASM). For most non-primitive cases, we are punting to the Python API. """ - schema = None if schema is None else sch.Schema(schema) - STATE.require_tx() - if schema == sch.OBJECT: - val = STATE.trace.proxy_object_path(value) + real_schema = None if schema is None else sch.Schema(schema) + trace, tx = STATE.require_tx() + if real_schema == sch.OBJECT: + val: Union[bool, int, float, bytes, Tuple[str, Address], List[bool], + List[int], List[str], str, TraceObject, + Address] = trace.proxy_object_path(value) else: - val, schema = eval_value(value, schema) - if schema == sch.ADDRESS: + val, real_schema = eval_value(value, real_schema) + if real_schema == sch.ADDRESS and isinstance(val, tuple): base, addr = val val = addr if base != addr.space: trace.create_overlay_space(base, addr.space) - STATE.trace.proxy_object_path(path).set_value(key, val, schema) + trace.proxy_object_path(path).set_value(key, val, real_schema) -def ghidra_trace_retain_values(path: str, keys: str): +def ghidra_trace_retain_values(path: str, keys: str) -> None: """ Retain only those keys listed, settings all others to null. @@ -764,25 +803,25 @@ def ghidra_trace_retain_values(path: str, keys: str): switch. All others are taken as keys. """ - keys = keys.split(" ") + key_list = keys.split(" ") - STATE.require_tx() + trace, tx = STATE.require_tx() kinds = 'elements' - if keys[0] == '--elements': + if key_list[0] == '--elements': kinds = 'elements' - keys = keys[1:] + key_list = key_list[1:] elif keys[0] == '--attributes': kinds = 'attributes' - keys = keys[1:] + key_list = key_list[1:] elif keys[0] == '--both': kinds = 'both' - keys = keys[1:] - elif keys[0].startswith('--'): - raise RuntimeError("Invalid argument: " + keys[0]) - STATE.trace.proxy_object_path(path).retain_values(keys, kinds=kinds) + key_list = key_list[1:] + elif key_list[0].startswith('--'): + raise RuntimeError("Invalid argument: " + key_list[0]) + trace.proxy_object_path(path).retain_values(key_list, kinds=kinds) -def ghidra_trace_get_obj(path): +def ghidra_trace_get_obj(path: str) -> None: """ Get an object descriptor by its canonical path. @@ -795,7 +834,7 @@ def ghidra_trace_get_obj(path): print("{}\t{}".format(object.id, object.path)) -def ghidra_trace_get_values(pattern): +def ghidra_trace_get_values(pattern: str) -> None: """ List all values matching a given path pattern. """ @@ -805,7 +844,8 @@ def ghidra_trace_get_values(pattern): print_tabular_values(values, print) -def ghidra_trace_get_values_rng(address, length): +def ghidra_trace_get_values_rng(address: Union[str, int], + length: Union[str, int]) -> None: """ List all values intersecting a given address range. """ @@ -813,13 +853,13 @@ def ghidra_trace_get_values_rng(address, length): trace = STATE.require_trace() start, end = eval_range(address, length) nproc = util.selected_process() - base, addr = trace.memory_mapper.map(nproc, start) + base, addr = trace.extra.require_mm().map(nproc, start) # Do not create the space. We're querying. No tx. values = wait(trace.get_values_intersecting(addr.extend(end - start))) print_tabular_values(values, print) -def activate(path=None): +def activate(path: Optional[str] = None) -> None: trace = STATE.require_trace() if path is None: nproc = util.selected_process() @@ -839,7 +879,7 @@ def activate(path=None): trace.proxy_object_path(path).activate() -def ghidra_trace_activate(path=None): +def ghidra_trace_activate(path: Optional[str] = None) -> None: """ Activate an object in Ghidra's GUI. @@ -850,7 +890,7 @@ def ghidra_trace_activate(path=None): activate(path) -def ghidra_trace_disassemble(address): +def ghidra_trace_disassemble(address: Union[str, int]) -> None: """ Disassemble starting at the given seed. @@ -858,46 +898,48 @@ def ghidra_trace_disassemble(address): memory encountered. """ - STATE.require_tx() + trace, tx = STATE.require_tx() nproc = util.selected_process() - base, addr = STATE.trace.memory_mapper.map(nproc, address) + base, addr = trace.extra.memory_mapper.map(nproc, address) if base != addr.space: trace.create_overlay_space(base, addr.space) - length = STATE.trace.disassemble(addr) + length = trace.disassemble(addr) print("Disassembled {} bytes".format(length)) -def put_processes(): +def put_processes() -> None: + trace = STATE.require_trace() keys = [] # Set running=True to avoid process changes, even while stopped for key in PROGRAMS.keys(): ppath = PROCESS_PATTERN.format(procnum=key) keys.append(PROCESS_KEY_PATTERN.format(procnum=key)) - procobj = STATE.trace.create_object(ppath) + procobj = trace.create_object(ppath) p = PROGRAMS[key] procobj.set_value('State', str(p.flags)) procobj.set_value('PID', key) procobj.set_value('_display', '[{:x}]'.format(key)) procobj.insert() - STATE.trace.proxy_object_path(PROCESSES_PATH).retain_values(keys) + trace.proxy_object_path(PROCESSES_PATH).retain_values(keys) -def ghidra_trace_put_processes(): +def ghidra_trace_put_processes() -> None: """ Put the list of processes into the trace's Processes list. """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_processes() -def put_environment(): +def put_environment() -> None: + trace = STATE.require_trace() nproc = util.selected_process() epath = ENV_PATTERN.format(procnum=nproc) - envobj = STATE.trace.create_object(epath) + envobj = trace.create_object(epath) envobj.set_value('Debugger', 'drgn') envobj.set_value('Arch', arch.get_arch()) envobj.set_value('OS', arch.get_osabi()) @@ -905,31 +947,32 @@ def put_environment(): envobj.insert() -def ghidra_trace_put_environment(): +def ghidra_trace_put_environment() -> None: """ Put some environment indicators into the Ghidra trace """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_environment() # Detect whether this is supported before defining the command if hasattr(drgn, 'RelocatableModule'): - def put_regions(): + def put_regions() -> None: + trace = STATE.require_trace() nproc = util.selected_process() if nproc is None: return try: - regions = prog.loaded_modules() + regions = prog.loaded_modules() # type: ignore except Exception as e: regions = [] # if len(regions) == 0: # regions = util.full_mem() - mapper = STATE.trace.memory_mapper + mapper = trace.extra.require_mm() keys = [] # r : MEMORY_BASIC_INFORMATION64 for r in regions: @@ -938,8 +981,8 @@ if hasattr(drgn, 'RelocatableModule'): size = end - start + 1 rpath = REGION_PATTERN.format(procnum=nproc, start=start) keys.append(REGION_KEY_PATTERN.format(start=start)) - regobj = STATE.trace.create_object(rpath) - (start_base, start_addr) = map_address(start) + regobj = trace.create_object(rpath) + (start_base, start_addr) = mapper.map(nproc, start) regobj.set_value('Range', start_addr.extend(size)) regobj.set_value('Name', r[0].name) regobj.set_value('Object File', r[0].loaded_file_path) @@ -948,32 +991,33 @@ if hasattr(drgn, 'RelocatableModule'): regobj.set_value('_executable', True) regobj.set_value('_display', '{:x} {}'.format(start, r[0].name)) regobj.insert() - STATE.trace.proxy_object_path( + trace.proxy_object_path( MEMORY_PATTERN.format(procnum=nproc)).retain_values(keys) - def ghidra_trace_put_regions(): + def ghidra_trace_put_regions() -> None: """ Read the memory map, if applicable, and write to the trace's Regions """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_regions() # Detect whether this is supported before defining the command if hasattr(drgn, 'RelocatableModule'): - def put_modules(): + def put_modules() -> None: + trace = STATE.require_trace() nproc = util.selected_process() if nproc is None: return try: - modules = prog.modules() + modules = prog.modules() # type: ignore except Exception as e: return - mapper = STATE.trace.memory_mapper + mapper = trace.extra.require_mm() mod_keys = [] for m in modules: name = m.name @@ -982,16 +1026,16 @@ if hasattr(drgn, 'RelocatableModule'): hbase = hex(base) size = m.address_range[1] - base mpath = MODULE_PATTERN.format(procnum=nproc, modpath=hbase) - modobj = STATE.trace.create_object(mpath) + modobj = trace.create_object(mpath) mod_keys.append(MODULE_KEY_PATTERN.format(modpath=hbase)) base_base, base_addr = mapper.map(nproc, base) if base_base != base_addr.space: - STATE.trace.create_overlay_space(base_base, base_addr.space) + trace.create_overlay_space(base_base, base_addr.space) modobj.set_value('Range', base_addr.extend(size)) modobj.set_value('Name', name) modobj.set_value('_display', '{:x} {}'.format(base, name)) modobj.insert() - attrobj = STATE.trace.create_object(mpath+".Attributes") + attrobj = trace.create_object(mpath+".Attributes") attrobj.set_value('BuildId', m.build_id) attrobj.set_value('DebugBias', m.debug_file_bias) attrobj.set_value('DebugPath', m.debug_file_path) @@ -1000,30 +1044,31 @@ if hasattr(drgn, 'RelocatableModule'): attrobj.set_value('LoadPath', m.loaded_file_path) attrobj.set_value('LoadStatus', str(m.loaded_file_status)) attrobj.insert() - if type(m) == drgn.RelocatableModule: - secobj = STATE.trace.create_object(mpath+".Sections") + if type(m) == drgn.RelocatableModule: # type: ignore + secobj = trace.create_object(mpath+".Sections") secobj.insert() - STATE.trace.proxy_object_path(MODULES_PATTERN.format( + trace.proxy_object_path(MODULES_PATTERN.format( procnum=nproc)).retain_values(mod_keys) - def ghidra_trace_put_modules(): + def ghidra_trace_put_modules() -> None: """ Gather object files, if applicable, and write to the trace's Modules """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_modules() # Detect whether this is supported before defining the command if hasattr(drgn, 'RelocatableModule'): - def put_sections(m: drgn.RelocatableModule): + def put_sections(m: drgn.RelocatableModule) -> None: # type: ignore nproc = util.selected_process() if nproc is None: return - mapper = STATE.trace.memory_mapper + trace = STATE.require_trace() + mapper = trace.extra.require_mm() section_keys = [] sections = m.section_addresses maddr = hex(m.address_range[0]) @@ -1031,21 +1076,21 @@ if hasattr(drgn, 'RelocatableModule'): value = sections[key] spath = SECTION_PATTERN.format( procnum=nproc, modpath=maddr, secname=key) - sobj = STATE.trace.create_object(spath) + sobj = trace.create_object(spath) section_keys.append(SECTION_KEY_PATTERN.format( modpath=maddr, secname=key)) base_base, base_addr = mapper.map(nproc, value) if base_base != base_addr.space: - STATE.trace.create_overlay_space(base_base, base_addr.space) + trace.create_overlay_space(base_base, base_addr.space) sobj.set_value('Address', base_addr) sobj.set_value('Range', base_addr.extend(1)) sobj.set_value('Name', key) sobj.insert() - STATE.trace.proxy_object_path(SECTIONS_PATTERN.format( + trace.proxy_object_path(SECTIONS_PATTERN.format( procnum=nproc, modpath=maddr)).retain_values(section_keys) -def convert_state(t): +def convert_state(t) -> str: if t.IsSuspended(): return 'SUSPENDED' if t.IsStopped(): @@ -1053,18 +1098,19 @@ def convert_state(t): return 'RUNNING' -def put_threads(running=False): +def put_threads(running: bool = False) -> None: nproc = util.selected_process() if nproc is None: return + trace = STATE.require_trace() keys = [] # Set running=True to avoid thread changes, even while stopped threads = prog.threads() for i, t in enumerate(threads): nthrd = t.tid tpath = THREAD_PATTERN.format(procnum=nproc, tnum=nthrd) - tobj = STATE.trace.create_object(tpath) + tobj = trace.create_object(tpath) keys.append(THREAD_KEY_PATTERN.format(tnum=nthrd)) tobj.set_value('TID', nthrd) @@ -1078,23 +1124,23 @@ def put_threads(running=False): tobj.set_value('_display', short) # tobj.set_value('Object', t.object) tobj.insert() - stackobj = STATE.trace.create_object(tpath+".Stack") + stackobj = trace.create_object(tpath+".Stack") stackobj.insert() - STATE.trace.proxy_object_path( + trace.proxy_object_path( THREADS_PATTERN.format(procnum=nproc)).retain_values(keys) -def ghidra_trace_put_threads(): +def ghidra_trace_put_threads() -> None: """ Put the current process's threads into the Ghidra trace """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_threads() -def put_frames(): +def put_frames() -> None: nproc = util.selected_process() if nproc < 0: return @@ -1110,67 +1156,69 @@ def put_frames(): except Exception as e: return - mapper = STATE.trace.memory_mapper + trace = STATE.require_trace() + mapper = trace.extra.require_mm() keys = [] for i, f in enumerate(stack): fpath = FRAME_PATTERN.format( procnum=nproc, tnum=nthrd, level=i) - fobj = STATE.trace.create_object(fpath) + fobj = trace.create_object(fpath) keys.append(FRAME_KEY_PATTERN.format(level=i)) base, offset_inst = mapper.map(nproc, f.pc) if base != offset_inst.space: - STATE.trace.create_overlay_space(base, offset_inst.space) + trace.create_overlay_space(base, offset_inst.space) base, offset_stack = mapper.map(nproc, f.sp) if base != offset_stack.space: - STATE.trace.create_overlay_space(base, offset_stack.space) + trace.create_overlay_space(base, offset_stack.space) fobj.set_value('PC', offset_inst) fobj.set_value('SP', offset_stack) fobj.set_value('Name', f.name) fobj.set_value('_display', "#{} {} {}".format( i, hex(offset_inst.offset), f.name)) fobj.insert() - aobj = STATE.trace.create_object(fpath+".Attributes") + aobj = trace.create_object(fpath+".Attributes") aobj.insert() aobj.set_value('Inline', f.is_inline) aobj.set_value('Interrupted', f.interrupted) aobj.insert() - lobj = STATE.trace.create_object(fpath+".Locals") + lobj = trace.create_object(fpath+".Locals") lobj.insert() - robj = STATE.trace.create_object(fpath+".Registers") + robj = trace.create_object(fpath+".Registers") robj.insert() try: src = f.source() - srcobj = STATE.trace.create_object(fpath+".Source") + srcobj = trace.create_object(fpath+".Source") srcobj.set_value('Filename', src[0]) srcobj.set_value('Line', src[1]) srcobj.set_value('Column', src[2]) srcobj.insert() except Exception as e: pass - STATE.trace.proxy_object_path(STACK_PATTERN.format( + trace.proxy_object_path(STACK_PATTERN.format( procnum=nproc, tnum=nthrd)).retain_values(keys) -def ghidra_trace_put_frames(): +def ghidra_trace_put_frames() -> None: """ Put the current thread's frames into the Ghidra trace """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_frames() -def put_symbols(pattern=None): +def put_symbols(pattern: Optional[str] = None) -> None: nproc = util.selected_process() if nproc is None: return + trace = STATE.require_trace() # keys = [] symbols = prog.symbols(pattern) for s in symbols: spath = SYMBOL_PATTERN.format(procnum=nproc, sid=hash(str(s))) - sobj = STATE.trace.create_object(spath) + sobj = trace.create_object(spath) # keys.append(SYMBOL_KEY_PATTERN.format(sid=i)) short = '{:x}'.format(s.address) @@ -1181,73 +1229,42 @@ def put_symbols(pattern=None): sobj.set_value('Name', s.name) else: sobj.set_value('_display', short) - mapper = STATE.trace.memory_mapper + mapper = trace.extra.require_mm() base, offset = mapper.map(nproc, s.address) if base != offset.space: - STATE.trace.create_overlay_space(base, offset.space) + trace.create_overlay_space(base, offset.space) sobj.set_value('Address', offset) sobj.set_value('Size', s.size) sobj.set_value('Binding', str(s.binding)) sobj.set_value('Kind', str(s.kind)) sobj.insert() - # STATE.trace.proxy_object_path( + # trace.proxy_object_path( # SYMBOLS_PATTERN.format(procnum=nproc)).retain_values(keys) -def ghidra_trace_put_symbols(): +def ghidra_trace_put_symbols() -> None: """ Put the current process's threads into the Ghidra trace """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_symbols() -def set_display(key, value, obj): - kind = util.get_kind(value) - vstr = util.get_value(value) - # istr = util.get_intrinsic_value(value) - if kind == ModelObjectKind.TARGET_OBJECT.value: - hloc = util.get_location(value) - ti = util.get_type_info(value) - if ti is not None: - name = util.get_name(ti) - if name is not None: - key += " : " + name - obj.set_value('_display', key) - if hloc is not None: - key += " @ " + str(hloc) - obj.set_value('_display', key) - (hloc_base, hloc_addr) = map_address(int(hloc, 0)) - obj.set_value('_address', hloc_addr, schema=Address) - if vstr is not None: - key += " : " + str(vstr) - obj.set_value('_display', key) - - -def map_address(address): - nproc = util.selected_process() - mapper = STATE.trace.memory_mapper - base, addr = mapper.map(nproc, address) - if base != addr.space: - STATE.trace.create_overlay_space(base, addr.space) - return (base, addr) - - -def ghidra_trace_put_all(): +def ghidra_trace_put_all() -> None: """ Put everything currently selected into the Ghidra trace """ - STATE.require_tx() - with STATE.client.batch() as b: + trace, tx = STATE.require_tx() + with trace.client.batch() as b: put_environment() if hasattr(drgn, 'RelocatableModule'): put_regions() put_modules() syms = SYMBOLS_PATTERN.format(procnum=util.selected_process()) - sobj = STATE.trace.create_object(syms) + sobj = trace.create_object(syms) sobj.insert() # put_symbols() put_threads() @@ -1257,7 +1274,7 @@ def ghidra_trace_put_all(): ghidra_trace_putmem(get_sp(), 1) -def ghidra_trace_install_hooks(): +def ghidra_trace_install_hooks() -> None: """ Install hooks to trace in Ghidra """ @@ -1265,7 +1282,7 @@ def ghidra_trace_install_hooks(): hooks.install_hooks() -def ghidra_trace_remove_hooks(): +def ghidra_trace_remove_hooks() -> None: """ Remove hooks to trace in Ghidra @@ -1277,7 +1294,7 @@ def ghidra_trace_remove_hooks(): hooks.remove_hooks() -def ghidra_trace_sync_enable(): +def ghidra_trace_sync_enable() -> None: """ Synchronize the current process with the Ghidra trace @@ -1296,7 +1313,7 @@ def ghidra_trace_sync_enable(): hooks.enable_current_process() -def ghidra_trace_sync_disable(): +def ghidra_trace_sync_disable() -> None: """ Cease synchronizing the current process with the Ghidra trace @@ -1307,23 +1324,7 @@ def ghidra_trace_sync_disable(): hooks.disable_current_process() -def ghidra_util_wait_stopped(timeout=1): - """ - Spin wait until the selected thread is stopped. - """ - - start = time.time() - t = util.selected_thread() - if t is None: - return - while not t.IsStopped() and not t.IsSuspended(): - t = util.selected_thread() # I suppose it could change - time.sleep(0.1) - if time.time() - start > timeout: - raise RuntimeError('Timed out waiting for thread to stop') - - -def get_pc(): +def get_pc() -> int: try: thread = prog.thread(util.selected_thread()) stack = thread.stack_trace() @@ -1334,7 +1335,7 @@ def get_pc(): return frame.pc -def get_sp(): +def get_sp() -> int: try: thread = prog.thread(util.selected_thread()) stack = thread.stack_trace() diff --git a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/hooks.py b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/hooks.py index 01bf9ffc67..daa0d49440 100644 --- a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/hooks.py +++ b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/hooks.py @@ -13,45 +13,43 @@ # See the License for the specific language governing permissions and # limitations under the License. ## +from dataclasses import dataclass, field import threading import time import drgn +from typing import Any, Callable, Collection, Dict, Optional, TypeVar, cast + from . import commands, util ALL_EVENTS = 0xFFFF +@dataclass(frozen=False) class HookState(object): - __slots__ = ('installed', 'mem_catchpoint') - - def __init__(self): - self.installed = False - self.mem_catchpoint = None + installed = False +@dataclass(frozen=False) class ProcessState(object): - __slots__ = ('first', 'regions', 'modules', 'threads', - 'breaks', 'watches', 'visited') + first = True + # For things we can detect changes to between stops + regions = False + modules = False + threads = False + breaks = False + watches = False + # For frames and threads that have already been synced since last stop + visited: set[Any] = field(default_factory=set) - def __init__(self): - self.first = True - # For things we can detect changes to between stops - self.regions = False - self.modules = False - self.threads = False - self.breaks = False - self.watches = False - # For frames and threads that have already been synced since last stop - self.visited = set() - - def record(self, description=None): + def record(self, description: Optional[str] = None) -> None: first = self.first self.first = False + trace = commands.STATE.require_trace() if description is not None: - commands.STATE.trace.snapshot(description) + trace.snapshot(description) if first: commands.put_processes() commands.put_environment() @@ -84,51 +82,53 @@ class ProcessState(object): commands.put_modules() self.modules = False - def record_continued(self): + def record_continued(self) -> None: commands.put_processes() commands.put_threads() - def record_exited(self, exit_code): + def record_exited(self, exit_code: int) -> None: + trace = commands.STATE.require_trace() nproc = util.selected_process() ipath = commands.PROCESS_PATTERN.format(procnum=nproc) - procobj = commands.STATE.trace.proxy_object_path(ipath) + procobj = trace.proxy_object_path(ipath) procobj.set_value('Exit Code', exit_code) procobj.set_value('State', 'TERMINATED') HOOK_STATE = HookState() -PROC_STATE = {} +PROC_STATE: Dict[int, ProcessState] = {} -def on_new_process(event): + +def on_new_process(id: int) -> None: trace = commands.STATE.trace if trace is None: return - with commands.STATE.client.batch(): - with trace.open_tx("New Process {}".format(event.process.num)): + with trace.client.batch(): + with trace.open_tx("New Process {}".format(id)): commands.put_processes() # TODO: Could put just the one.... -def on_process_selected(): +def on_process_selected() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return trace = commands.STATE.trace if trace is None: return - with commands.STATE.client.batch(): + with trace.client.batch(): with trace.open_tx("Process {} selected".format(nproc)): PROC_STATE[nproc].record() commands.activate() -def on_new_thread(event): +def on_new_thread() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return PROC_STATE[nproc].threads = True -def on_thread_selected(): +def on_thread_selected() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return @@ -136,14 +136,14 @@ def on_thread_selected(): if trace is None: return nthrd = util.selected_thread() - with commands.STATE.client.batch(): + with trace.client.batch(): with trace.open_tx("Thread {}.{} selected".format(nproc, nthrd)): PROC_STATE[nproc].record() commands.put_threads() commands.activate() -def on_frame_selected(): +def on_frame_selected() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return @@ -152,7 +152,7 @@ def on_frame_selected(): return nthrd = util.selected_thread() level = util.selected_frame() - with commands.STATE.client.batch(): + with trace.client.batch(): with trace.open_tx("Frame {}.{}.{} selected".format(nproc, nthrd, level)): PROC_STATE[nproc].record() commands.put_threads() @@ -160,32 +160,7 @@ def on_frame_selected(): commands.activate() -def on_memory_changed(event): - nproc = util.get_process() - if nproc not in PROC_STATE: - return - trace = commands.STATE.trace - if trace is None: - return - with commands.STATE.client.batch(): - with trace.open_tx("Memory *0x{:08x} changed".format(event.address)): - commands.put_bytes(event.address, event.address + event.length, - pages=False, is_mi=False, result=None) - - -def on_register_changed(event): - nproc = util.get_process() - if nproc not in PROC_STATE: - return - trace = commands.STATE.trace - if trace is None: - return - with commands.STATE.client.batch(): - with trace.open_tx("Register {} changed".format(event.regnum)): - commands.putreg() - - -def on_cont(event): +def on_cont() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return @@ -193,12 +168,12 @@ def on_cont(event): if trace is None: return state = PROC_STATE[nproc] - with commands.STATE.client.batch(): + with trace.client.batch(): with trace.open_tx("Continued"): state.record_continued() -def on_stop(event): +def on_stop() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: PROC_STATE[nproc] = ProcessState() @@ -208,7 +183,7 @@ def on_stop(event): return state = PROC_STATE[nproc] state.visited.clear() - with commands.STATE.client.batch(): + with trace.client.batch(): with trace.open_tx("Stopped"): state.record("Stopped") commands.put_threads() @@ -216,34 +191,31 @@ def on_stop(event): commands.activate() -def modules_changed(): +def modules_changed() -> None: nproc = util.selected_process() if nproc not in PROC_STATE: return PROC_STATE[nproc].modules = True -def install_hooks(): +def install_hooks() -> None: if HOOK_STATE.installed: return HOOK_STATE.installed = True - event_thread = EventThread() - event_thread.start() - -def remove_hooks(): +def remove_hooks() -> None: if not HOOK_STATE.installed: return HOOK_STATE.installed = False -def enable_current_process(): +def enable_current_process() -> None: nproc = util.selected_process() PROC_STATE[nproc] = ProcessState() -def disable_current_process(): +def disable_current_process() -> None: nproc = util.selected_process() if nproc in PROC_STATE: del PROC_STATE[nproc] diff --git a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/methods.py b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/methods.py index beabf9e7f7..d228e8c673 100644 --- a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/methods.py +++ b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/methods.py @@ -19,10 +19,11 @@ from io import StringIO import re import sys import time -from typing import Annotated, Any, Dict, Optional +from typing import Annotated, Any, Dict, Optional, Tuple import drgn import drgn.cli +from drgn import Module, StackFrame # type: ignore from ghidratrace import sch from ghidratrace.client import ( @@ -35,159 +36,22 @@ REGISTRY = MethodRegistry(ThreadPoolExecutor( max_workers=1, thread_name_prefix='MethodRegistry')) -def extre(base, ext): +def extre(base, ext) -> re.Pattern: return re.compile(base.pattern + ext) PROCESSES_PATTERN = re.compile('Processes') -PROCESS_PATTERN = extre(PROCESSES_PATTERN, '\[(?P\\d*)\]') -ENV_PATTERN = extre(PROCESS_PATTERN, '\.Environment') -THREADS_PATTERN = extre(PROCESS_PATTERN, '\.Threads') -THREAD_PATTERN = extre(THREADS_PATTERN, '\[(?P\\d*)\]') -STACK_PATTERN = extre(THREAD_PATTERN, '\.Stack') -FRAME_PATTERN = extre(STACK_PATTERN, '\[(?P\\d*)\]') +PROCESS_PATTERN = extre(PROCESSES_PATTERN, '\\[(?P\\d*)\\]') +ENV_PATTERN = extre(PROCESS_PATTERN, '\\.Environment') +THREADS_PATTERN = extre(PROCESS_PATTERN, '\\.Threads') +THREAD_PATTERN = extre(THREADS_PATTERN, '\\[(?P\\d*)\\]') +STACK_PATTERN = extre(THREAD_PATTERN, '\\.Stack') +FRAME_PATTERN = extre(STACK_PATTERN, '\\[(?P\\d*)\\]') REGS_PATTERN = extre(FRAME_PATTERN, '.Registers') LOCALS_PATTERN = extre(FRAME_PATTERN, '.Locals') -MEMORY_PATTERN = extre(PROCESS_PATTERN, '\.Memory') -MODULES_PATTERN = extre(PROCESS_PATTERN, '\.Modules') -MODULE_PATTERN = extre(MODULES_PATTERN, '\[(?P.*)\]') - - -def find_availpid_by_pattern(pattern, object, err_msg): - mat = pattern.fullmatch(object.path) - if mat is None: - raise TypeError(f"{object} is not {err_msg}") - pid = int(mat['pid']) - return pid - - -def find_availpid_by_obj(object): - return find_availpid_by_pattern(AVAILABLE_PATTERN, object, "an Available") - - -def find_proc_by_num(id): - if id != util.selected_process(): - util.select_process(id) - return util.selected_process() - - -def find_proc_by_pattern(object, pattern, err_msg): - mat = pattern.fullmatch(object.path) - if mat is None: - raise TypeError(f"{object} is not {err_msg}") - procnum = int(mat['procnum']) - return find_proc_by_num(procnum) - - -def find_proc_by_obj(object): - return find_proc_by_pattern(object, PROCESS_PATTERN, "an Process") - - -def find_proc_by_env_obj(object): - return find_proc_by_pattern(object, ENV_PATTERN, "an Environment") - - -def find_proc_by_threads_obj(object): - return find_proc_by_pattern(object, THREADS_PATTERN, "a ThreadContainer") - - -def find_proc_by_mem_obj(object): - return find_proc_by_pattern(object, MEMORY_PATTERN, "a Memory") - - -def find_proc_by_modules_obj(object): - return find_proc_by_pattern(object, MODULES_PATTERN, "a ModuleContainer") - - -def find_thread_by_num(id): - if id != util.selected_thread(): - util.select_thread(id) - return util.selected_thread() - - -def find_thread_by_pattern(pattern, object, err_msg): - mat = pattern.fullmatch(object.path) - if mat is None: - raise TypeError(f"{object} is not {err_msg}") - pnum = int(mat['procnum']) - tnum = int(mat['tnum']) - find_proc_by_num(pnum) - return find_thread_by_num(tnum) - - -def find_thread_by_obj(object): - return find_thread_by_pattern(THREAD_PATTERN, object, "a Thread") - - -def find_thread_by_stack_obj(object): - return find_thread_by_pattern(STACK_PATTERN, object, "a Stack") - - -def find_thread_by_regs_obj(object): - return find_thread_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer") - - -def find_frame_by_level(level): - tnum = util.selected_thread() - thread = commands.prog.thread(tnum) - try: - frames = thread.stack_trace() - except Exception as e: - print(e) - return - - for i, f in enumerate(frames): - if i == level: - if i != util.selected_frame(): - util.select_frame(i) - return i, f - - -def find_frame_by_pattern(pattern, object, err_msg): - mat = pattern.fullmatch(object.path) - if mat is None: - raise TypeError(f"{object} is not {err_msg}") - pnum = int(mat['procnum']) - tnum = int(mat['tnum']) - level = int(mat['level']) - find_proc_by_num(pnum) - find_thread_by_num(tnum) - return find_frame_by_level(level) - - -def find_frame_by_obj(object): - return find_frame_by_pattern(FRAME_PATTERN, object, "a StackFrame") - - -def find_frame_by_regs_obj(object): - return find_frame_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer") - - -def find_frame_by_locals_obj(object): - return find_frame_by_pattern(LOCALS_PATTERN, object, "a LocalsContainer") - - -def find_module_by_base(modbase): - for m in commands.prog.modules(): - if modbase == str(hex(m.address_range[0])): - return m - - -def find_module_by_pattern(pattern, object, err_msg): - mat = pattern.fullmatch(object.path) - if mat is None: - raise TypeError(f"{object} is not {err_msg}") - pnum = int(mat['procnum']) - modbase = mat['modbase'] - find_proc_by_num(pnum) - return find_module_by_base(modbase) - - -def find_module_by_obj(object): - return find_module_by_pattern(MODULE_PATTERN, object, "a Module") - - -shared_globals: Dict[str, Any] = dict() +MEMORY_PATTERN = extre(PROCESS_PATTERN, '\\.Memory') +MODULES_PATTERN = extre(PROCESS_PATTERN, '\\.Modules') +MODULE_PATTERN = extre(MODULES_PATTERN, '\\[(?P.*)\\]') class Environment(TraceObject): @@ -222,10 +86,6 @@ class RegisterValueContainer(TraceObject): pass -class StackFrame(TraceObject): - pass - - class SymbolContainer(TraceObject): pass @@ -238,6 +98,136 @@ class ThreadContainer(TraceObject): pass +def find_proc_by_num(id: int) -> int: + if id != util.selected_process(): + util.select_process(id) + return util.selected_process() + + +def find_proc_by_pattern(object: TraceObject, pattern: re.Pattern, + err_msg: str) -> int: + mat = pattern.fullmatch(object.path) + if mat is None: + raise TypeError(f"{object} is not {err_msg}") + procnum = int(mat['procnum']) + return find_proc_by_num(procnum) + + +def find_proc_by_obj(object: TraceObject) -> int: + return find_proc_by_pattern(object, PROCESS_PATTERN, "an Process") + + +def find_proc_by_env_obj(object: TraceObject) -> int: + return find_proc_by_pattern(object, ENV_PATTERN, "an Environment") + + +def find_proc_by_threads_obj(object: TraceObject) -> int: + return find_proc_by_pattern(object, THREADS_PATTERN, "a ThreadContainer") + + +def find_proc_by_mem_obj(object: TraceObject) -> int: + return find_proc_by_pattern(object, MEMORY_PATTERN, "a Memory") + + +def find_proc_by_modules_obj(object: TraceObject) -> int: + return find_proc_by_pattern(object, MODULES_PATTERN, "a ModuleContainer") + + +def find_thread_by_num(id: int) -> Optional[int]: + if id != util.selected_thread(): + util.select_thread(id) + return util.selected_thread() + + +def find_thread_by_pattern(pattern: re.Pattern, object: TraceObject, + err_msg: str) -> Optional[int]: + mat = pattern.fullmatch(object.path) + if mat is None: + raise TypeError(f"{object} is not {err_msg}") + pnum = int(mat['procnum']) + tnum = int(mat['tnum']) + find_proc_by_num(pnum) + return find_thread_by_num(tnum) + + +def find_thread_by_obj(object: TraceObject) -> Optional[int]: + return find_thread_by_pattern(THREAD_PATTERN, object, "a Thread") + + +def find_thread_by_stack_obj(object: TraceObject) -> Optional[int]: + return find_thread_by_pattern(STACK_PATTERN, object, "a Stack") + + +def find_thread_by_regs_obj(object: TraceObject) -> Optional[int]: + return find_thread_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer") + + +def find_frame_by_level(level: int) -> Optional[Tuple[int, StackFrame]]: + tnum = util.selected_thread() + thread = commands.prog.thread(tnum) + try: + frames = thread.stack_trace() + except Exception as e: + print(e) + return None + + for i, f in enumerate(frames): + if i == level: + if i != util.selected_frame(): + util.select_frame(i) + return i, f + return None + + +def find_frame_by_pattern(pattern: re.Pattern, object: TraceObject, + err_msg: str) -> Optional[Tuple[int, StackFrame]]: + mat = pattern.fullmatch(object.path) + if mat is None: + raise TypeError(f"{object} is not {err_msg}") + pnum = int(mat['procnum']) + tnum = int(mat['tnum']) + level = int(mat['level']) + find_proc_by_num(pnum) + find_thread_by_num(tnum) + return find_frame_by_level(level) + + +def find_frame_by_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]: + return find_frame_by_pattern(FRAME_PATTERN, object, "a StackFrame") + + +def find_frame_by_regs_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]: + return find_frame_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer") + + +def find_frame_by_locals_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]: + return find_frame_by_pattern(LOCALS_PATTERN, object, "a LocalsContainer") + + +def find_module_by_base(modbase: TraceObject) -> Module: + for m in commands.prog.modules(): # type: ignore + if modbase == str(hex(m.address_range[0])): + return m + + +def find_module_by_pattern(pattern: re.Pattern, object: TraceObject, + err_msg: str) -> int: + mat = pattern.fullmatch(object.path) + if mat is None: + raise TypeError(f"{object} is not {err_msg}") + pnum = int(mat['procnum']) + modbase = mat['modbase'] + find_proc_by_num(pnum) + return find_module_by_base(modbase) + + +def find_module_by_obj(object: TraceObject) -> int: + return find_module_by_pattern(MODULE_PATTERN, object, "a Module") + + +shared_globals: Dict[str, Any] = dict() + + @REGISTRY.method() def execute(cmd: str, to_string: bool = False) -> Optional[str]: """Execute a Python3 command or script.""" @@ -341,9 +331,12 @@ def activate_thread(thread: Thread) -> None: @REGISTRY.method(action='activate') -def activate_frame(frame: StackFrame) -> None: +def activate_frame(frame: TraceObject) -> None: """Select the frame.""" - i, f = find_frame_by_obj(frame) + res = find_frame_by_obj(frame) + if res is None: + return + i, f = res util.select_frame(i) with commands.open_tracked_tx('Refresh Stack'): commands.ghidra_trace_put_frames() @@ -411,7 +404,7 @@ def step_into(thread: Thread, """Step one instruction exactly.""" find_thread_by_obj(thread) time.sleep(1) - hooks.on_stop(None) + hooks.on_stop() # @REGISTRY.method diff --git a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/util.py b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/util.py index 2dcdce204e..902e3eed70 100644 --- a/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/util.py +++ b/Ghidra/Debug/Debugger-agent-drgn/src/main/py/src/ghidradrgn/util.py @@ -14,9 +14,11 @@ # limitations under the License. ## from collections import namedtuple +from dataclasses import dataclass import os import re import sys +from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union import drgn import drgn.cli @@ -29,7 +31,7 @@ selected_tid = 0 selected_level = 0 -def _compute_drgn_ver(): +def _compute_drgn_ver() -> DrgnVersion: blurb = drgn.cli.version_header() top = blurb.split('\n')[0] full = top.split()[1] # "drgn x.y.z" @@ -38,7 +40,17 @@ def _compute_drgn_ver(): DRGN_VERSION = _compute_drgn_ver() -def full_mem(self): + +@dataclass(frozen=True) +class Region: + start: int + end: int + offset: int + perms: Optional[str] + objfile: str + + +def full_mem(self) -> Region: return Region(0, 1 << 64, 0, None, 'full memory') @@ -46,49 +58,41 @@ def get_debugger(): return drgn -def get_target(): - return commands.prog - - -def get_process(name): - return get_target()[name] - - -def selected_process(): +def selected_process() -> int: return selected_pid -def selected_thread(): +def selected_thread() -> int: return selected_tid -def selected_frame(): +def selected_frame() -> int: return selected_level -def select_process(id: int): +def select_process(id: int) -> int: global selected_pid selected_pid = id return selected_pid -def select_thread(id: int): +def select_thread(id: int) -> int: global selected_tid selected_tid = id return selected_tid -def select_frame(id: int): +def select_frame(id: int) -> int: global selected_level selected_level = id return selected_level -conv_map = {} +conv_map: Dict[str, Any] = {} -def get_convenience_variable(id): - #val = get_target().GetEnvironment().Get(id) +def get_convenience_variable(id: str): + # val = get_target().GetEnvironment().Get(id) if id not in conv_map: return "auto" val = conv_map[id] @@ -97,18 +101,18 @@ def get_convenience_variable(id): return val -def set_convenience_variable(id, value): - #env = get_target().GetEnvironment() +def set_convenience_variable(id: str, value: Any) -> None: + # env = get_target().GetEnvironment() # return env.Set(id, value, True) conv_map[id] = value -def escape_ansi(line): +def escape_ansi(line: str) -> str: ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]') return ansi_escape.sub('', line) -def debracket(init): +def debracket(init: str) -> str: val = init val = val.replace("[", "(") val = val.replace("]", ")")