GP-3836: Add Trace RMI 'Connections' pane.

This commit is contained in:
Dan 2023-12-01 09:10:12 -05:00
parent 5fd01c739d
commit bf8f7c8f78
82 changed files with 3836 additions and 270 deletions

View file

@ -62,12 +62,15 @@ SECTION_ADD_PATTERN = SECTIONS_ADD_PATTERN + SECTION_KEY_PATTERN
# TODO: Symbols
class ErrorWithCode(Exception):
def __init__(self,code):
def __init__(self, code):
self.code = code
def __str__(self)->str:
return repr(self.code)
class State(object):
def __init__(self):
@ -114,7 +117,8 @@ class State(object):
STATE = State()
def ghidra_trace_connect(address=None):
"""
Connect Python to Ghidra for tracing
@ -124,8 +128,9 @@ def ghidra_trace_connect(address=None):
STATE.require_no_client()
if address is None:
raise RuntimeError("'ghidra_trace_connect': missing required argument 'address'")
raise RuntimeError(
"'ghidra_trace_connect': missing required argument 'address'")
parts = address.split(':')
if len(parts) != 2:
raise RuntimeError("address must be in the form 'host:port'")
@ -133,11 +138,13 @@ def ghidra_trace_connect(address=None):
try:
c = socket.socket()
c.connect((host, int(port)))
STATE.client = Client(c, methods.REGISTRY)
except ValueError:
# TODO: Can we get version info from the DLL?
STATE.client = Client(c, "dbgeng.dll", methods.REGISTRY)
print(f"Connected to {STATE.client.description} at {address}")
except ValueError:
raise RuntimeError("port must be numeric")
def ghidra_trace_listen(address='0.0.0.0:0'):
"""
Listen for Ghidra to connect for tracing
@ -243,7 +250,8 @@ def ghidra_trace_create(command=None, initial_break=True, timeout=None, start_tr
if timeout != None:
util.base._client.CreateProcess(command, DbgEng.DEBUG_PROCESS)
if initial_break:
util.base._control.AddEngineOptions(DbgEng.DEBUG_ENGINITIAL_BREAK)
util.base._control.AddEngineOptions(
DbgEng.DEBUG_ENGINITIAL_BREAK)
util.base.wait(timeout)
else:
util.base.create(command, initial_break)
@ -267,7 +275,7 @@ def ghidra_trace_info():
print("Not connected to Ghidra\n")
return
host, port = STATE.client.s.getpeername()
print("Connected to Ghidra at {}:{}\n".format(host, port))
print(f"Connected to {STATE.client.description} at {host}:{port}\n")
if STATE.trace is None:
print("No trace\n")
return
@ -363,7 +371,7 @@ def put_bytes(start, end, pages, display_result):
if end - start <= 0:
return {'count': 0}
buf = dbg().read(start, end - start)
count = 0
if buf != None:
base, addr = trace.memory_mapper.map(nproc, start)
@ -405,7 +413,7 @@ def ghidra_trace_putmem(items):
address = items[0]
length = items[1]
pages = items[2] if len(items) > 2 else True
STATE.require_tx()
return putmem(address, length, pages, True)
@ -418,7 +426,7 @@ def ghidra_trace_putval(items):
items = items.split(" ")
value = items[0]
pages = items[1] if len(items) > 1 else True
STATE.require_tx()
try:
start = util.parse_and_eval(value)
@ -565,28 +573,29 @@ def ghidra_trace_remove_obj(path):
def to_bytes(value):
return bytes(ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0,len(value)))
return bytes(ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value)))
def to_string(value, encoding):
b = bytes(ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0,len(value)))
b = bytes(ord(value[i]) if type(value[i]) == str else int(
value[i]) for i in range(0, len(value)))
return str(b, encoding)
def to_bool_list(value):
return [bool(value[i]) for i in range(0,len(value))]
return [bool(value[i]) for i in range(0, len(value))]
def to_int_list(value):
return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0,len(value))]
return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value))]
def to_short_list(value):
return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0,len(value))]
return [ord(value[i]) if type(value[i]) == str else int(value[i]) for i in range(0, len(value))]
def to_string_list(value, encoding):
return [to_string(value[i], encoding) for i in range(0,len(value))]
return [to_string(value[i], encoding) for i in range(0, len(value))]
def eval_value(value, schema=None):
@ -614,9 +623,9 @@ def eval_value(value, schema=None):
return to_string_list(value, 'utf-8'), schema
if schema == sch.CHAR_ARR:
return to_string(value, 'utf-8'), sch.CHAR_ARR
if schema == sch.STRING:
if schema == sch.STRING:
return to_string(value, 'utf-8'), sch.STRING
return value, schema
@ -796,7 +805,7 @@ def ghidra_trace_activate(path=None):
This has no effect if the current trace is not current in Ghidra. If path is
omitted, this will activate the current frame.
"""
activate(path)
@ -822,7 +831,7 @@ def ghidra_trace_disassemble(address):
def compute_proc_state(nproc=None):
status = dbg()._control.GetExecutionStatus()
if status == DbgEng.DEBUG_STATUS_BREAK:
if status == DbgEng.DEBUG_STATUS_BREAK:
return 'STOPPED'
return 'RUNNING'
@ -841,13 +850,15 @@ def put_processes(running=False):
procobj.set_value('_state', istate)
if running == False:
procobj.set_value('_pid', p[0])
pidstr = ('0x{:x}' if radix == 16 else '0{:o}' if radix == 8 else '{}').format(p[0])
pidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(p[0])
procobj.set_value('_display', pidstr)
procobj.set_value('Name', str(p[1]))
procobj.set_value('PEB', hex(p[2]))
procobj.insert()
STATE.trace.proxy_object_path(PROCESSES_PATH).retain_values(keys)
def put_state(event_process):
STATE.require_no_tx()
STATE.tx = STATE.require_trace().start_tx("state", undoable=False)
@ -873,10 +884,10 @@ def ghidra_trace_put_processes():
def put_available():
radix = util.get_convenience_variable('output-radix')
keys = []
result = dbg().cmd(".tlist")
result = dbg().cmd(".tlist")
lines = result.split("\n")
for i in lines:
i = i.strip();
i = i.strip()
if i == "":
continue
if i.startswith("0n") is False:
@ -930,10 +941,10 @@ def put_single_breakpoint(bp, ibobj, nproc, ikeys):
if bp.GetType()[0] == DbgEng.DEBUG_BREAKPOINT_DATA:
width, prot = bp.GetDataParameters()
width = str(width)
prot = {4: 'HW_EXECUTE', 2: 'READ', 1: 'WRITE'}[prot]
prot = {4: 'HW_EXECUTE', 2: 'READ', 1: 'WRITE'}[prot]
else:
width = ' '
prot = 'SW_EXECUTE'
prot = 'SW_EXECUTE'
if address is not None: # Implies execution break
base, addr = mapper.map(nproc, address)
@ -968,7 +979,6 @@ def put_single_breakpoint(bp, ibobj, nproc, ikeys):
ikeys.append(k)
def put_breakpoints():
target = util.get_target()
nproc = util.selected_process()
@ -998,7 +1008,7 @@ def ghidra_trace_put_breakpoints():
STATE.require_tx()
with STATE.client.batch() as b:
put_breakpoints()
def put_environment():
epath = ENV_PATTERN.format(procnum=util.selected_process())
@ -1039,9 +1049,12 @@ def put_regions():
if start_base != start_addr.space:
STATE.trace.create_overlay_space(start_base, start_addr.space)
regobj.set_value('_range', start_addr.extend(r.RegionSize))
regobj.set_value('_readable', r.Protect == None or r.Protect&0x66 != 0)
regobj.set_value('_writable', r.Protect == None or r.Protect&0xCC != 0)
regobj.set_value('_executable', r.Protect == None or r.Protect&0xF0 != 0)
regobj.set_value('_readable', r.Protect ==
None or r.Protect & 0x66 != 0)
regobj.set_value('_writable', r.Protect ==
None or r.Protect & 0xCC != 0)
regobj.set_value('_executable', r.Protect ==
None or r.Protect & 0xF0 != 0)
regobj.set_value('_offset', hex(r.BaseAddress))
regobj.set_value('Base', hex(r.BaseAddress))
regobj.set_value('Size', hex(r.RegionSize))
@ -1089,13 +1102,13 @@ def put_modules():
modobj.set_value('Size', hex(size))
modobj.set_value('Flags', hex(size))
modobj.insert()
# TODO: would be nice to list sections, but currently we have no API for
# TODO: would be nice to list sections, but currently we have no API for
# it as far as I am aware
# sec_keys = []
# STATE.trace.proxy_object_path(
# mpath + SECTIONS_ADD_PATTERN).retain_values(sec_keys)
STATE.trace.proxy_object_path(MODULES_PATTERN.format(
procnum=nproc)).retain_values(mod_keys)
@ -1146,7 +1159,7 @@ def put_threads(running=False):
tid = t[0]
tobj.set_value('_tid', tid)
tidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(tid)
16 else '0{:o}' if radix == 8 else '{}').format(tid)
tobj.set_value('_short_display', '[{}.{}:{}]'.format(
nproc, i, tidstr))
tobj.set_value('_display', compute_thread_display(tidstr, t))
@ -1201,7 +1214,8 @@ def put_frames():
fobj.set_value('StackOffset', hex(f.StackOffset))
fobj.set_value('ReturnOffset', hex(f.ReturnOffset))
fobj.set_value('FrameOffset', hex(f.FrameOffset))
fobj.set_value('_display', "#{} {}".format(f.FrameNumber, hex(f.InstructionOffset)))
fobj.set_value('_display', "#{} {}".format(
f.FrameNumber, hex(f.InstructionOffset)))
fobj.insert()
STATE.trace.proxy_object_path(STACK_PATTERN.format(
procnum=nproc, tnum=nthrd)).retain_values(keys)
@ -1326,7 +1340,7 @@ def repl():
dbg().wait()
else:
pass
#dbg().dispatch_events()
# dbg().dispatch_events()
except KeyboardInterrupt as e:
print("")
print("You have left the dbgeng REPL and are now at the Python3 interpreter.")