GP-4144: Many fixes, esp., for dbgeng Trace RMI.

This commit is contained in:
Dan 2024-01-29 14:56:28 -05:00
parent 1281fb979b
commit a6549947ab
30 changed files with 1526 additions and 725 deletions

View file

@ -13,7 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
import ctypes
ctypes.windll.kernel32.SetErrorMode(0x0001|0x0002|0x8000)
from . import util, commands, methods, hooks
import ctypes
ctypes.windll.kernel32.SetErrorMode(0x0001 | 0x0002 | 0x8000)

View file

@ -55,8 +55,9 @@ compiler_map = {
def get_arch():
try:
type = util.get_debugger()._control.GetActualProcessorType()
type = util.dbg.get_actual_processor_type()
except Exception:
print("Error getting actual processor type.")
return "Unknown"
if type is None:
return "x86_64"
@ -85,10 +86,11 @@ def get_osabi():
if not parm in ['auto', 'default']:
return parm
try:
os = util.get_debugger().cmd("vertarget")
if "Windows" not in os:
os = util.dbg.cmd("vertarget")
if "Windows" not in os:
return "default"
except Exception:
print("Error getting target OS/ABI")
pass
return "windows"
@ -154,7 +156,8 @@ class DefaultMemoryMapper(object):
def map_back(self, proc: int, address: Address) -> int:
if address.space == self.defaultSpace:
return address.offset
raise ValueError(f"Address {address} is not in process {proc.GetProcessID()}")
raise ValueError(
f"Address {address} is not in process {proc.GetProcessID()}")
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
@ -179,14 +182,13 @@ class DefaultRegisterMapper(object):
def map_name(self, proc, name):
return name
def map_value(self, proc, name, value):
try:
### TODO: this seems half-baked
# TODO: this seems half-baked
av = value.to_bytes(8, "big")
except Exception:
raise ValueError("Cannot convert {}'s value: '{}', type: '{}'"
.format(name, value, type(value)))
.format(name, value, type(value)))
return RegVal(self.map_name(proc, name), av)
def map_name_back(self, proc, name):
@ -237,4 +239,3 @@ def compute_register_mapper(lang):
if ':LE:' in lang:
return DEFAULT_LE_REGISTER_MAPPER
return register_mappers[lang]

View file

@ -13,23 +13,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
import code
from contextlib import contextmanager
import inspect
import os.path
import socket
import time
import sys
import re
from ghidratrace import sch
from ghidratrace.client import Client, Address, AddressRange, TraceObject
import socket
import sys
import time
from pybag import pydbg, userdbg, kerneldbg
from pybag.dbgeng import core as DbgEng
from pybag.dbgeng import exception
from ghidratrace import sch
from ghidratrace.client import Client, Address, AddressRange, TraceObject
from . import util, arch, methods, hooks
import code
PAGE_SIZE = 4096
@ -170,10 +171,10 @@ def ghidra_trace_listen(address='0.0.0.0:0'):
s.bind((host, int(port)))
host, port = s.getsockname()
s.listen(1)
print("Listening at {}:{}...\n".format(host, port))
print("Listening at {}:{}...".format(host, port))
c, (chost, cport) = s.accept()
s.close()
print("Connection from {}:{}\n".format(chost, cport))
print("Connection from {}:{}".format(chost, cport))
STATE.client = Client(c, "dbgeng.dll", methods.REGISTRY)
except ValueError:
raise RuntimeError("port must be numeric")
@ -209,7 +210,7 @@ def start_trace(name):
schema_xml = schema_file.read()
with STATE.trace.open_tx("Create Root Object"):
root = STATE.trace.create_root_object(schema_xml, 'Session')
root.set_value('_display', 'pydbg(dbgeng) ' + util.DBG_VERSION.full)
root.set_value('_display', util.DBG_VERSION.full + ' via pybag')
util.set_convenience_variable('_ghidra_tracing', "true")
@ -240,47 +241,49 @@ def ghidra_trace_restart(name=None):
start_trace(name)
def ghidra_trace_create(command=None, initial_break=True, timeout=None, start_trace=True):
@util.dbg.eng_thread
def ghidra_trace_create(command=None, initial_break=True, timeout=DbgEng.WAIT_INFINITE, start_trace=True):
"""
Create a session.
"""
util.base = userdbg.UserDbg()
dbg = util.dbg._base
if command != None:
if timeout != None:
util.base._client.CreateProcess(command, DbgEng.DEBUG_PROCESS)
if initial_break:
util.base._control.AddEngineOptions(
DbgEng.DEBUG_ENGINITIAL_BREAK)
util.base.wait(timeout)
else:
util.base.create(command, initial_break)
dbg._client.CreateProcess(command, DbgEng.DEBUG_PROCESS)
if initial_break:
dbg._control.AddEngineOptions(DbgEng.DEBUG_ENGINITIAL_BREAK)
dbg.wait(timeout)
if start_trace:
ghidra_trace_start(command)
@util.dbg.eng_thread
def ghidra_trace_kill():
"""
Kill a session.
"""
dbg()._client.TerminateCurrentProcess()
dbg = util.dbg._base
dbg._client.TerminateCurrentProcess()
try:
dbg.wait()
except exception.E_UNEXPECTED_Error:
# Expect the unexpected, I guess.
pass
def ghidra_trace_info():
"""Get info about the Ghidra connection"""
result = {}
if STATE.client is None:
print("Not connected to Ghidra\n")
print("Not connected to Ghidra")
return
host, port = STATE.client.s.getpeername()
print(f"Connected to {STATE.client.description} at {host}:{port}\n")
print(f"Connected to {STATE.client.description} at {host}:{port}")
if STATE.trace is None:
print("No trace\n")
print("No trace")
return
print("Trace active\n")
return result
print("Trace active")
def ghidra_trace_info_lcsp():
@ -289,8 +292,8 @@ def ghidra_trace_info_lcsp():
"""
language, compiler = arch.compute_ghidra_lcsp()
print("Selected Ghidra language: {}\n".format(language))
print("Selected Ghidra compiler: {}\n".format(compiler))
print("Selected Ghidra language: {}".format(language))
print("Selected Ghidra compiler: {}".format(compiler))
def ghidra_trace_txstart(description="tx"):
@ -319,7 +322,7 @@ def ghidra_trace_txabort():
"""
tx = STATE.require_tx()
print("Aborting trace transaction!\n")
print("Aborting trace transaction!")
tx.abort()
STATE.reset_tx()
@ -362,15 +365,22 @@ def ghidra_trace_set_snap(snap=None):
STATE.require_trace().set_snap(int(snap))
def quantize_pages(start, end):
return (start // PAGE_SIZE * PAGE_SIZE, (end+PAGE_SIZE-1) // PAGE_SIZE*PAGE_SIZE)
@util.dbg.eng_thread
def put_bytes(start, end, pages, display_result):
trace = STATE.require_trace()
if pages:
start = start // PAGE_SIZE * PAGE_SIZE
end = (end + PAGE_SIZE - 1) // PAGE_SIZE * PAGE_SIZE
start, end = quantize_pages(start, end)
nproc = util.selected_process()
if end - start <= 0:
return {'count': 0}
buf = dbg().read(start, end - start)
try:
buf = util.dbg._base.read(start, end - start)
except OSError:
return {'count': 0}
count = 0
if buf != None:
@ -379,7 +389,7 @@ def put_bytes(start, end, pages, display_result):
trace.create_overlay_space(base, addr.space)
count = trace.put_bytes(addr, buf)
if display_result:
print("Wrote {} bytes\n".format(count))
print("Wrote {} bytes".format(count))
return {'count': count}
@ -404,16 +414,11 @@ def putmem(address, length, pages=True, display_result=True):
return put_bytes(start, end, pages, display_result)
def ghidra_trace_putmem(items):
def ghidra_trace_putmem(address, length, pages=True):
"""
Record the given block of memory into the Ghidra trace.
"""
items = items.split(" ")
address = items[0]
length = items[1]
pages = items[2] if len(items) > 2 else True
STATE.require_tx()
return putmem(address, length, pages, True)
@ -436,27 +441,28 @@ def ghidra_trace_putval(items):
return put_bytes(start, end, pages, True)
def ghidra_trace_putmem_state(items):
"""
Set the state of the given range of memory in the Ghidra trace.
"""
items = items.split(" ")
address = items[0]
length = items[1]
state = items[2]
STATE.require_tx()
def putmem_state(address, length, state, pages=True):
STATE.trace.validate_state(state)
start, end = eval_range(address, length)
if pages:
start, end = quantize_pages(start, end)
nproc = util.selected_process()
base, addr = STATE.trace.memory_mapper.map(nproc, start)
if base != addr.space:
if base != addr.space and state != 'unknown':
trace.create_overlay_space(base, addr.space)
STATE.trace.set_memory_state(addr.extend(end - start), state)
def ghidra_trace_delmem(items):
def ghidra_trace_putmem_state(address, length, state, pages=True):
"""
Set the state of the given range of memory in the Ghidra trace.
"""
STATE.require_tx()
return putmem_state(address, length, state, pages)
def ghidra_trace_delmem(address, length):
"""
Delete the given range of memory from the Ghidra trace.
@ -465,10 +471,6 @@ def ghidra_trace_delmem(items):
not quantize. You must do that yourself, if necessary.
"""
items = items.split(" ")
address = items[0]
length = items[1]
STATE.require_tx()
start, end = eval_range(address, length)
nproc = util.selected_process()
@ -477,6 +479,7 @@ def ghidra_trace_delmem(items):
STATE.trace.delete_bytes(addr.extend(end - start))
@util.dbg.eng_thread
def putreg():
nproc = util.selected_process()
if nproc < 0:
@ -488,7 +491,7 @@ def putreg():
robj.insert()
mapper = STATE.trace.register_mapper
values = []
regs = dbg().reg
regs = util.dbg._base.reg
for i in range(0, len(regs)):
name = regs._reg.GetDescription(i)[0]
value = regs._get_register_by_index(i)
@ -511,6 +514,7 @@ def ghidra_trace_putreg():
putreg()
@util.dbg.eng_thread
def ghidra_trace_delreg(group='all'):
"""
Delete the given register group for the curent frame from the Ghidra trace.
@ -524,11 +528,11 @@ def ghidra_trace_delreg(group='all'):
space = REGS_PATTERN.format(procnum=nproc, tnum=nthrd)
mapper = STATE.trace.register_mapper
names = []
regs = dbg().reg
regs = util.dbg._base.reg
for i in range(0, len(regs)):
name = regs._reg.GetDescription(i)[0]
names.append(mapper.map_name(nproc, name))
return STATE.trace.delete_registers(space, names)
STATE.trace.delete_registers(space, names)
def ghidra_trace_create_obj(path=None):
@ -543,8 +547,7 @@ def ghidra_trace_create_obj(path=None):
STATE.require_tx()
obj = STATE.trace.create_object(path)
obj.insert()
print("Created object: id={}, path='{}'\n".format(obj.id, obj.path))
return {'id': obj.id, 'path': obj.path}
print("Created object: id={}, path='{}'".format(obj.id, obj.path))
def ghidra_trace_insert_obj(path):
@ -556,8 +559,7 @@ def ghidra_trace_insert_obj(path):
# humans.
STATE.require_tx()
span = STATE.trace.proxy_object_path(path).insert()
print("Inserted object: lifespan={}\n".format(span))
return {'lifespan': span}
print("Inserted object: lifespan={}".format(span))
def ghidra_trace_remove_obj(path):
@ -600,10 +602,10 @@ def to_string_list(value, encoding):
def eval_value(value, schema=None):
if schema == sch.CHAR or schema == sch.BYTE or schema == sch.SHORT or schema == sch.INT or schema == sch.LONG or schema == None:
value = util.get_eval(value)
value = util.parse_and_eval(value)
return value, schema
if schema == sch.ADDRESS:
value = util.get_eval(value)
value = util.parse_and_eval(value)
nproc = util.selected_process()
base, addr = STATE.trace.memory_mapper.map(nproc, value)
return (base, addr), sch.ADDRESS
@ -696,8 +698,7 @@ def ghidra_trace_get_obj(path):
trace = STATE.require_trace()
object = trace.get_object(path)
print("{}\t{}\n".format(object.id, object.path))
return object
print("{}\t{}".format(object.id, object.path))
class TableColumn(object):
@ -734,7 +735,7 @@ class Tabular(object):
for rn in range(self.num_rows):
for c in self.columns:
c.print_cell(rn)
print('\n')
print('')
def val_repr(value):
@ -761,18 +762,13 @@ def ghidra_trace_get_values(pattern):
trace = STATE.require_trace()
values = trace.get_values(pattern)
print_values(values)
return values
def ghidra_trace_get_values_rng(items):
def ghidra_trace_get_values_rng(address, length):
"""
List all values intersecting a given address range.
"""
items = items.split(" ")
address = items[0]
length = items[1]
trace = STATE.require_trace()
start, end = eval_range(address, length)
nproc = util.selected_process()
@ -780,7 +776,6 @@ def ghidra_trace_get_values_rng(items):
# Do not create the space. We're querying. No tx.
values = trace.get_values_intersecting(addr.extend(end - start))
print_values(values)
return values
def activate(path=None):
@ -825,34 +820,33 @@ def ghidra_trace_disassemble(address):
trace.create_overlay_space(base, addr.space)
length = STATE.trace.disassemble(addr)
print("Disassembled {} bytes\n".format(length))
return {'length': length}
print("Disassembled {} bytes".format(length))
@util.dbg.eng_thread
def compute_proc_state(nproc=None):
status = dbg()._control.GetExecutionStatus()
status = util.dbg._base._control.GetExecutionStatus()
if status == DbgEng.DEBUG_STATUS_BREAK:
return 'STOPPED'
return 'RUNNING'
def put_processes(running=False):
radix = util.get_convenience_variable('output-radix')
if radix == 'auto':
radix = 16
# | always displays PID in hex
# TODO: I'm not sure about the engine id
keys = []
for i, p in enumerate(util.process_list(running)):
# Set running=True to avoid process changes, even while stopped
for i, p in enumerate(util.process_list(running=True)):
ipath = PROCESS_PATTERN.format(procnum=i)
keys.append(PROCESS_KEY_PATTERN.format(procnum=i))
procobj = STATE.trace.create_object(ipath)
istate = compute_proc_state(i)
procobj.set_value('_state', istate)
if running == False:
procobj.set_value('_pid', p[0])
pidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(p[0])
procobj.set_value('_display', pidstr)
pid = p[0]
procobj.set_value('_pid', pid)
procobj.set_value('_display', '{:x} {:x}'.format(i, pid))
if len(p) > 1:
procobj.set_value('Name', str(p[1]))
procobj.set_value('PEB', hex(p[2]))
procobj.insert()
@ -860,8 +854,6 @@ def put_processes(running=False):
def put_state(event_process):
STATE.require_no_tx()
STATE.tx = STATE.require_trace().start_tx("state", undoable=False)
ipath = PROCESS_PATTERN.format(procnum=event_process)
procobj = STATE.trace.create_object(ipath)
state = compute_proc_state(event_process)
@ -873,8 +865,6 @@ def put_state(event_process):
threadobj = STATE.trace.create_object(ipath)
threadobj.set_value('_state', state)
threadobj.insert()
STATE.require_tx().commit()
STATE.reset_tx()
def ghidra_trace_put_processes():
@ -887,10 +877,11 @@ def ghidra_trace_put_processes():
put_processes()
@util.dbg.eng_thread
def put_available():
radix = util.get_convenience_variable('output-radix')
keys = []
result = dbg().cmd(".tlist")
result = util.dbg._base.cmd(".tlist")
lines = result.split("\n")
for i in lines:
i = i.strip()
@ -923,6 +914,7 @@ def ghidra_trace_put_available():
put_available()
@util.dbg.eng_thread
def put_single_breakpoint(bp, ibobj, nproc, ikeys):
mapper = STATE.trace.memory_mapper
bpath = PROC_BREAK_PATTERN.format(procnum=nproc, breaknum=bp.GetId())
@ -937,7 +929,7 @@ def put_single_breakpoint(bp, ibobj, nproc, ikeys):
else:
address = bp.GetOffset()
offset = "%016x" % address
expr = dbg().get_name_by_offset(address)
expr = util.dbg._base.get_name_by_offset(address)
try:
tid = bp.GetMatchThreadId()
tid = "%04x" % tid
@ -965,7 +957,7 @@ def put_single_breakpoint(bp, ibobj, nproc, ikeys):
STATE.trace.create_overlay_space(base, addr.space)
brkobj.set_value('_range', addr.extend(width))
except Exception as e:
print("Error: Could not get range for breakpoint: {}\n".format(e))
print("Error: Could not get range for breakpoint: {}".format(e))
else: # I guess it's a catchpoint
pass
@ -985,6 +977,7 @@ def put_single_breakpoint(bp, ibobj, nproc, ikeys):
ikeys.append(k)
@util.dbg.eng_thread
def put_breakpoints():
target = util.get_target()
nproc = util.selected_process()
@ -992,12 +985,12 @@ def put_breakpoints():
ibobj = STATE.trace.create_object(ibpath)
keys = []
ikeys = []
ids = [bpid for bpid in dbg().breakpoints]
ids = [bpid for bpid in util.dbg._base.breakpoints]
for bpid in ids:
try:
bp = dbg()._control.GetBreakpointById(bpid)
bp = util.dbg._base._control.GetBreakpointById(bpid)
except exception.E_NOINTERFACE_Error:
dbg().breakpoints._remove_stale(bpid)
util.dbg._base.breakpoints._remove_stale(bpid)
continue
keys.append(PROC_BREAK_KEY_PATTERN.format(breaknum=bpid))
put_single_breakpoint(bp, ibobj, nproc, ikeys)
@ -1036,10 +1029,11 @@ def ghidra_trace_put_environment():
put_environment()
@util.dbg.eng_thread
def put_regions():
nproc = util.selected_process()
try:
regions = dbg().memory_list()
regions = util.dbg._base.memory_list()
except Exception:
regions = []
if len(regions) == 0 and util.selected_thread() != None:
@ -1082,10 +1076,11 @@ def ghidra_trace_put_regions():
put_regions()
@util.dbg.eng_thread
def put_modules():
target = util.get_target()
nproc = util.selected_process()
modules = dbg().module_list()
modules = util.dbg._base.module_list()
mapper = STATE.trace.memory_mapper
mod_keys = []
for m in modules:
@ -1137,38 +1132,33 @@ def convert_state(t):
return 'RUNNING'
def convert_tid(t):
if t[1] == 0:
return t[2]
return t[1]
def compute_thread_display(tidstr, t):
return '[{} {}]'.format(tidstr, t[2])
def compute_thread_display(i, pid, tid, t):
if len(t) > 1:
return '{:x} {:x}:{:x} {}'.format(i, pid, tid, t[2])
return '{:x} {:x}:{:x}'.format(i, pid, tid)
def put_threads(running=False):
radix = util.get_convenience_variable('output-radix')
if radix == 'auto':
radix = 16
# ~ always displays PID:TID in hex
# TODO: I'm not sure about the engine id
nproc = util.selected_process()
if nproc == None:
if nproc is None:
return
pid = util.dbg.pid
keys = []
for i, t in enumerate(util.thread_list(running)):
# Set running=True to avoid thread changes, even while stopped
for i, t in enumerate(util.thread_list(running=True)):
tpath = THREAD_PATTERN.format(procnum=nproc, tnum=i)
tobj = STATE.trace.create_object(tpath)
keys.append(THREAD_KEY_PATTERN.format(tnum=i))
#tobj.set_value('_state', convert_state(t))
if running == False:
tobj.set_value('_name', t[2])
tid = t[0]
tobj.set_value('_tid', tid)
tidstr = ('0x{:x}' if radix ==
16 else '0{:o}' if radix == 8 else '{}').format(tid)
tobj.set_value('_short_display', '[{}.{}:{}]'.format(
nproc, i, tidstr))
tobj.set_value('_display', compute_thread_display(tidstr, t))
tid = t[0]
tobj.set_value('_tid', tid)
tobj.set_value('_short_display',
'{:x} {:x}:{:x}'.format(i, pid, tid))
tobj.set_value('_display', compute_thread_display(i, pid, tid, t))
if len(t) > 1:
tobj.set_value('TEB', hex(t[1]))
tobj.set_value('Name', t[2])
tobj.insert()
@ -1199,6 +1189,7 @@ def ghidra_trace_put_threads():
put_threads()
@util.dbg.eng_thread
def put_frames():
nproc = util.selected_process()
mapper = STATE.trace.memory_mapper
@ -1207,7 +1198,7 @@ def put_frames():
return
keys = []
# f : _DEBUG_STACK_FRAME
for f in dbg().backtrace_list():
for f in util.dbg._base.backtrace_list():
fpath = FRAME_PATTERN.format(
procnum=nproc, tnum=nthrd, level=f.FrameNumber)
fobj = STATE.trace.create_object(fpath)
@ -1254,8 +1245,8 @@ def ghidra_trace_put_all():
put_breakpoints()
put_available()
ghidra_trace_putreg()
ghidra_trace_putmem("$pc 1")
ghidra_trace_putmem("$sp 1")
ghidra_trace_putmem(util.get_pc(), 1)
ghidra_trace_putmem(util.get_sp(), 1)
def ghidra_trace_install_hooks():
@ -1324,34 +1315,42 @@ def ghidra_util_wait_stopped(timeout=1):
raise RuntimeError('Timed out waiting for thread to stop')
def dbg():
return util.get_debugger()
def get_prompt_text():
try:
return util.dbg.get_prompt_text()
except util.DebuggeeRunningException:
return 'Running>'
SHOULD_WAIT = ['GO', 'STEP_BRANCH', 'STEP_INTO', 'STEP_OVER']
@util.dbg.eng_thread
def exec_cmd(cmd):
dbg = util.dbg
dbg.cmd(cmd, quiet=False)
stat = dbg.exec_status()
if stat != 'BREAK':
dbg.wait()
def repl():
print("This is the dbgeng.dll (WinDbg) REPL. To drop to Python3, press Ctrl-C.")
print("")
print("This is the Windows Debugger REPL. To drop to Python, type .exit")
while True:
# TODO: Implement prompt retrieval in PR to pybag?
print('dbg> ', end='')
print(get_prompt_text(), end=' ')
try:
cmd = input().strip()
if not cmd:
if cmd == '':
continue
dbg().cmd(cmd, quiet=True)
stat = dbg().exec_status()
if stat != 'BREAK':
dbg().wait()
else:
pass
# dbg().dispatch_events()
elif cmd == '.exit':
break
exec_cmd(cmd)
except KeyboardInterrupt as e:
util.dbg.interrupt()
except util.DebuggeeRunningException as e:
print("")
print("You have left the dbgeng REPL and are now at the Python3 interpreter.")
print("use repl() to re-enter.")
return
except:
# Assume cmd() has already output the error
pass
print("Debuggee is Running. Use Ctrl-C to interrupt.")
except BaseException as e:
pass # Error is printed by another mechanism
print("")
print("You have left the Windows Debugger REPL and are now at the Python "
"interpreter.")
print("To re-enter, type repl()")

View file

@ -13,20 +13,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from _ctypes_test import func
import functools
import sys
import time
import threading
import time
import traceback
from comtypes.hresult import S_OK
from pybag import pydbg
from pybag.dbgeng.callbacks import EventHandler
from pybag.dbgeng import core as DbgEng
from pybag.dbgeng import exception
from pybag.dbgeng.callbacks import EventHandler
from pybag.dbgeng.idebugbreakpoint import DebugBreakpoint
from . import commands, util
ALL_EVENTS = 0xFFFF
class HookState(object):
__slots__ = ('installed', 'mem_catchpoint')
@ -36,7 +43,8 @@ class HookState(object):
class ProcessState(object):
__slots__ = ('first', 'regions', 'modules', 'threads', 'breaks', 'watches', 'visited', 'waiting')
__slots__ = ('first', 'regions', 'modules', 'threads',
'breaks', 'watches', 'visited', 'waiting')
def __init__(self):
self.first = True
@ -48,10 +56,10 @@ class ProcessState(object):
self.watches = False
# For frames and threads that have already been synced since last stop
self.visited = set()
self.waiting = True
self.waiting = False
def record(self, description=None):
#print("RECORDING")
# print("RECORDING")
first = self.first
self.first = False
if description is not None:
@ -66,8 +74,10 @@ class ProcessState(object):
if thread is not None:
if first or thread not in self.visited:
commands.putreg()
commands.putmem("$pc", "1", display_result=False)
commands.putmem("$sp", "1", display_result=False)
commands.putmem('0x{:x}'.format(util.get_pc()),
"1", display_result=False)
commands.putmem('0x{:x}'.format(util.get_sp()),
"1", display_result=False)
commands.put_frames()
self.visited.add(thread)
frame = util.selected_frame()
@ -122,55 +132,90 @@ BRK_STATE = BrkState()
PROC_STATE = {}
def log_errors(func):
'''
Wrap a function in a try-except that prints and reraises the
exception.
This is needed because pybag and/or the COM wrappers do not print
exceptions that occur during event callbacks.
'''
@functools.wraps(func)
def _func(*args, **kwargs):
try:
return func(*args, **kwargs)
except:
traceback.print_exc()
raise
return _func
@log_errors
def on_state_changed(*args):
#print("ON_STATE_CHANGED")
# print("ON_STATE_CHANGED")
# print(args)
if args[0] == DbgEng.DEBUG_CES_CURRENT_THREAD:
return on_thread_selected(args)
elif args[0] == DbgEng.DEBUG_CES_BREAKPOINTS:
return on_breakpoint_modified(args)
elif args[0] == DbgEng.DEBUG_CES_RADIX:
util.set_convenience_variable('output-radix', args[1])
return DbgEng.DEBUG_STATUS_GO
return S_OK
elif args[0] == DbgEng.DEBUG_CES_EXECUTION_STATUS:
util.dbg._ces_exec_status(args[1])
proc = util.selected_process()
if args[1] & DbgEng.DEBUG_STATUS_INSIDE_WAIT:
PROC_STATE[proc].waiting = True
return DbgEng.DEBUG_STATUS_GO
PROC_STATE[proc].waiting = False
commands.put_state(proc)
if proc in PROC_STATE:
# Process may have exited (so deleted) first
PROC_STATE[proc].waiting = True
return S_OK
if proc in PROC_STATE:
# Process may have exited (so deleted) first.
PROC_STATE[proc].waiting = False
trace = commands.STATE.trace
with commands.STATE.client.batch():
with trace.open_tx("State changed proc {}".format(proc)):
commands.put_state(proc)
if args[1] == DbgEng.DEBUG_STATUS_BREAK:
return on_stop(args)
else:
return on_cont(args)
return DbgEng.DEBUG_STATUS_GO
return S_OK
@log_errors
def on_debuggee_changed(*args):
#print("ON_DEBUGGEE_CHANGED")
# print("ON_DEBUGGEE_CHANGED: args={}".format(args))
# sys.stdout.flush()
trace = commands.STATE.trace
if trace is None:
return
if args[1] == DbgEng.DEBUG_CDS_REGISTERS:
on_register_changed(args[0][1])
#if args[1] == DbgEng.DEBUG_CDS_DATA:
# on_memory_changed(args[0][1])
return DbgEng.DEBUG_STATUS_GO
return S_OK
if args[0] == DbgEng.DEBUG_CDS_REGISTERS:
on_register_changed(args[1])
if args[0] == DbgEng.DEBUG_CDS_DATA:
on_memory_changed(args[1])
return S_OK
@log_errors
def on_session_status_changed(*args):
#print("ON_STATUS_CHANGED")
print("ON_STATUS_CHANGED: args={}".format(args))
trace = commands.STATE.trace
if trace is None:
return
if args[0] == DbgEng.DEBUG_SESSION_ACTIVE or args[0] == DbgEng.DEBUG_SSESION_REBOOT:
if args[0] == DbgEng.DEBUG_SESSION_ACTIVE or args[0] == DbgEng.DEBUG_SESSION_REBOOT:
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
return DbgEng.DEBUG_STATUS_GO
with trace.open_tx("New Session {}".format(util.selected_process())):
commands.put_processes()
return DbgEng.DEBUG_STATUS_GO
@log_errors
def on_symbol_state_changed(*args):
#print("ON_SYMBOL_STATE_CHANGED")
# print("ON_SYMBOL_STATE_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
@ -179,31 +224,33 @@ def on_symbol_state_changed(*args):
return DbgEng.DEBUG_STATUS_GO
@log_errors
def on_system_error(*args):
print("ON_SYSTEM_ERROR")
print(hex(args[0]))
print("ON_SYSTEM_ERROR: args={}".format(args))
# print(hex(args[0]))
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
with trace.open_tx("System Error {}".format(util.selected_process())):
commands.put_processes()
return DbgEng.DEBUG_STATUS_BREAK
@log_errors
def on_new_process(*args):
#print("ON_NEW_PROCESS")
# print("ON_NEW_PROCESS")
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
commands.put_processes()
return DbgEng.DEBUG_STATUS_BREAK
def on_process_selected():
#print("PROCESS_SELECTED")
# print("PROCESS_SELECTED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
@ -216,8 +263,9 @@ def on_process_selected():
commands.activate()
@log_errors
def on_process_deleted(*args):
#print("ON_PROCESS_DELETED")
# print("ON_PROCESS_DELETED")
proc = args[0]
on_exited(proc)
if proc in PROC_STATE:
@ -231,8 +279,9 @@ def on_process_deleted(*args):
return DbgEng.DEBUG_STATUS_BREAK
@log_errors
def on_threads_changed(*args):
#print("ON_THREADS_CHANGED")
# print("ON_THREADS_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return DbgEng.DEBUG_STATUS_GO
@ -241,7 +290,8 @@ def on_threads_changed(*args):
def on_thread_selected(*args):
#print("THREAD_SELECTED")
# print("THREAD_SELECTED: args={}".format(args))
# sys.stdout.flush()
nthrd = args[0][1]
nproc = util.selected_process()
if nproc not in PROC_STATE:
@ -261,7 +311,7 @@ def on_thread_selected(*args):
def on_register_changed(regnum):
#print("REGISTER_CHANGED")
# print("REGISTER_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
@ -274,7 +324,25 @@ def on_register_changed(regnum):
commands.activate()
def on_memory_changed(space):
if space != DbgEng.DEBUG_DATA_SPACE_VIRTUAL:
return
proc = util.selected_process()
if proc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
# Not great, but invalidate the whole space
# UI will only re-fetch what it needs
# But, some observations will not be recovered
with commands.STATE.client.batch():
with trace.open_tx("Memory changed"):
commands.putmem_state(0, 2**64, 'unknown')
def on_cont(*args):
# print("ON CONT")
proc = util.selected_process()
if proc not in PROC_STATE:
return
@ -289,13 +357,14 @@ def on_cont(*args):
def on_stop(*args):
# print("ON STOP")
proc = util.selected_process()
if proc not in PROC_STATE:
print("not in state")
# print("not in state")
return
trace = commands.STATE.trace
if trace is None:
print("no trace")
# print("no trace")
return
state = PROC_STATE[proc]
state.visited.clear()
@ -308,7 +377,7 @@ def on_stop(*args):
def on_exited(proc):
if proc not in PROC_STATE:
print("not in state")
# print("not in state")
return
trace = commands.STATE.trace
if trace is None:
@ -323,8 +392,9 @@ def on_exited(proc):
commands.activate()
@log_errors
def on_modules_changed(*args):
#print("ON_MODULES_CHANGED")
# print("ON_MODULES_CHANGED")
proc = util.selected_process()
if proc not in PROC_STATE:
return DbgEng.DEBUG_STATUS_GO
@ -333,6 +403,7 @@ def on_modules_changed(*args):
def on_breakpoint_created(bp):
# print("ON_BREAKPOINT_CREATED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
@ -350,7 +421,7 @@ def on_breakpoint_created(bp):
def on_breakpoint_modified(*args):
#print("BREAKPOINT_MODIFIED")
# print("BREAKPOINT_MODIFIED")
proc = util.selected_process()
if proc not in PROC_STATE:
return
@ -362,9 +433,9 @@ def on_breakpoint_modified(*args):
ibobj = trace.create_object(ibpath)
bpid = args[0][1]
try:
bp = dbg()._control.GetBreakpointById(bpid)
bp = util.dbg._base._control.GetBreakpointById(bpid)
except exception.E_NOINTERFACE_Error:
dbg().breakpoints._remove_stale(bpid)
util.dbg._base.breakpoints._remove_stale(bpid)
return on_breakpoint_deleted(bpid)
return on_breakpoint_created(bp)
@ -383,33 +454,27 @@ def on_breakpoint_deleted(bpid):
trace.proxy_object_path(bpath).remove(tree=True)
@log_errors
def on_breakpoint_hit(*args):
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
return DbgEng.DEBUG_STATUS_GO
# print("ON_BREAKPOINT_HIT: args={}".format(args))
return DbgEng.DEBUG_STATUS_BREAK
@log_errors
def on_exception(*args):
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(util.selected_process())):
commands.put_processes()
return DbgEng.DEBUG_STATUS_GO
# print("ON_EXCEPTION: args={}".format(args))
return DbgEng.DEBUG_STATUS_BREAK
@util.dbg.eng_thread
def install_hooks():
# print("Installing hooks")
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
events = dbg().events
events = util.dbg._base.events
events.engine_state(handler=on_state_changed)
events.debuggee_state(handler=on_debuggee_changed)
events.session_status(handler=on_session_status_changed)
@ -422,20 +487,24 @@ def install_hooks():
events.exit_thread(handler=on_threads_changed)
events.module_load(handler=on_modules_changed)
events.unload_module(handler=on_modules_changed)
#events.breakpoint(handler=on_breakpoint_hit)
#events.exception(handler=on_exception)
events.breakpoint(handler=on_breakpoint_hit)
events.exception(handler=on_exception)
@util.dbg.eng_thread
def remove_hooks():
# print("Removing hooks")
if not HOOK_STATE.installed:
return
HOOK_STATE.installed = False
dbg()._reset_callbacks()
util.dbg._base._reset_callbacks()
def enable_current_process():
# print("Enable current process")
proc = util.selected_process()
# print("proc: {}".format(proc))
PROC_STATE[proc] = ProcessState()
@ -444,6 +513,3 @@ def disable_current_process():
if proc in PROC_STATE:
# Silently ignore already disabled
del PROC_STATE[proc]
def dbg():
return util.get_debugger()

View file

@ -14,21 +14,22 @@
# limitations under the License.
##
from concurrent.futures import Future, ThreadPoolExecutor
from contextlib import redirect_stdout
from io import StringIO
import re
import sys
from pybag import pydbg
from pybag.dbgeng import core as DbgEng, exception
from ghidratrace import sch
from ghidratrace.client import MethodRegistry, ParamDesc, Address, AddressRange
from pybag import pydbg
from pybag.dbgeng import core as DbgEng
from . import util, commands
from contextlib import redirect_stdout
from io import StringIO
REGISTRY = MethodRegistry(ThreadPoolExecutor(max_workers=1))
REGISTRY = MethodRegistry(ThreadPoolExecutor(
max_workers=1, thread_name_prefix='MethodRegistry'))
def extre(base, ext):
@ -85,11 +86,12 @@ def find_proc_by_obj(object):
def find_proc_by_procbreak_obj(object):
return find_proc_by_pattern(object, PROC_BREAKS_PATTERN,
"a BreakpointLocationContainer")
"a BreakpointLocationContainer")
def find_proc_by_procwatch_obj(object):
return find_proc_by_pattern(object, PROC_WATCHES_PATTERN,
"a WatchpointContainer")
"a WatchpointContainer")
def find_proc_by_env_obj(object):
@ -178,25 +180,28 @@ def find_bpt_by_obj(object):
shared_globals = dict()
@REGISTRY.method
# @util.dbg.eng_thread
def execute(cmd: str, to_string: bool=False):
"""Execute a CLI command."""
#print("***{}***".format(cmd))
#sys.stderr.flush()
#sys.stdout.flush()
"""Execute a Python3 command or script."""
# print("***{}***".format(cmd))
# sys.stderr.flush()
# sys.stdout.flush()
if to_string:
data = StringIO()
with redirect_stdout(data):
exec("{}".format(cmd), shared_globals)
exec(cmd, shared_globals)
return data.getvalue()
else:
exec("{}".format(cmd), shared_globals)
exec(cmd, shared_globals)
@REGISTRY.method
# @util.dbg.eng_thread
def evaluate(expr: str):
"""Execute a CLI command."""
return str(eval("{}".format(expr), shared_globals))
"""Evaluate a Python3 expression."""
return str(eval(expr, shared_globals))
@REGISTRY.method(action='refresh')
@ -241,6 +246,7 @@ def refresh_environment(node: sch.Schema('Environment')):
with commands.open_tracked_tx('Refresh Environment'):
commands.ghidra_trace_put_environment()
@REGISTRY.method(action='refresh')
def refresh_threads(node: sch.Schema('ThreadContainer')):
"""Refresh the list of threads in the process."""
@ -287,6 +293,7 @@ def activate_process(process: sch.Schema('Process')):
"""Switch to the process."""
find_proc_by_obj(process)
@REGISTRY.method(action='activate')
def activate_thread(thread: sch.Schema('Thread')):
"""Switch to the thread."""
@ -300,46 +307,54 @@ def activate_frame(frame: sch.Schema('StackFrame')):
@REGISTRY.method(action='delete')
@util.dbg.eng_thread
def remove_process(process: sch.Schema('Process')):
"""Remove the process."""
find_proc_by_obj(process)
dbg().detach()
dbg().detach_proc()
@REGISTRY.method(action='connect')
@util.dbg.eng_thread
def target(process: sch.Schema('Process'), spec: str):
"""Connect to a target machine or process."""
find_proc_by_obj(process)
dbg().attach(spec)
dbg().attach_kernel(spec)
@REGISTRY.method(action='attach')
@util.dbg.eng_thread
def attach_obj(target: sch.Schema('Attachable')):
"""Attach the process to the given target."""
pid = find_availpid_by_obj(target)
dbg().attach(pid)
dbg().attach_proc(pid)
@REGISTRY.method(action='attach')
@util.dbg.eng_thread
def attach_pid(pid: int):
"""Attach the process to the given target."""
dbg().attach(pid)
dbg().attach_proc(pid)
@REGISTRY.method(action='attach')
@util.dbg.eng_thread
def attach_name(process: sch.Schema('Process'), name: str):
"""Attach the process to the given target."""
dbg().atach(name)
dbg().attach_proc(name)
@REGISTRY.method
@util.dbg.eng_thread
def detach(process: sch.Schema('Process')):
"""Detach the process's target."""
dbg().detach()
dbg().detach_proc()
@REGISTRY.method(action='launch')
def launch_loader(
file: ParamDesc(str, display='File'),
args: ParamDesc(str, display='Arguments')=''):
file: ParamDesc(str, display='File'),
args: ParamDesc(str, display='Arguments')=''):
"""
Start a native process with the given command line, stopping at the ntdll initial breakpoint.
"""
@ -351,65 +366,72 @@ def launch_loader(
@REGISTRY.method(action='launch')
def launch(
timeout: ParamDesc(int, display='Timeout'),
file: ParamDesc(str, display='File'),
args: ParamDesc(str, display='Arguments')=''):
args: ParamDesc(str, display='Arguments')='',
initial_break: ParamDesc(bool, display='Initial Break')=True,
timeout: ParamDesc(int, display='Timeout')=-1):
"""
Run a native process with the given command line.
"""
command = file
if args != None:
command += " "+args
commands.ghidra_trace_create(command, initial_break=False, timeout=timeout, start_trace=False)
commands.ghidra_trace_create(
command, initial_break=initial_break, timeout=timeout, start_trace=False)
@REGISTRY.method
@util.dbg.eng_thread
def kill(process: sch.Schema('Process')):
"""Kill execution of the process."""
dbg().terminate()
commands.ghidra_trace_kill()
@REGISTRY.method(name='continue', action='resume')
def _continue(process: sch.Schema('Process')):
@REGISTRY.method(action='resume')
def go(process: sch.Schema('Process')):
"""Continue execution of the process."""
dbg().go()
util.dbg.run_async(lambda: dbg().go())
@REGISTRY.method
def interrupt(process: sch.Schema('Process')):
"""Interrupt the execution of the debugged program."""
dbg()._control.SetInterrupt(DbgEng.DEBUG_INTERRUPT_ACTIVE)
# SetInterrupt is reentrant, so bypass the thread checks
util.dbg._protected_base._control.SetInterrupt(
DbgEng.DEBUG_INTERRUPT_ACTIVE)
@REGISTRY.method(action='step_into')
def step_into(thread: sch.Schema('Thread'), n: ParamDesc(int, display='N')=1):
"""Step one instruction exactly."""
find_thread_by_obj(thread)
dbg().stepi(n)
util.dbg.run_async(lambda: dbg().stepi(n))
@REGISTRY.method(action='step_over')
def step_over(thread: sch.Schema('Thread'), n: ParamDesc(int, display='N')=1):
"""Step one instruction, but proceed through subroutine calls."""
find_thread_by_obj(thread)
dbg().stepo(n)
util.dbg.run_async(lambda: dbg().stepo(n))
@REGISTRY.method(action='step_out')
def step_out(thread: sch.Schema('Thread')):
"""Execute until the current stack frame returns."""
find_thread_by_obj(thread)
dbg().stepout()
util.dbg.run_async(lambda: dbg().stepout())
@REGISTRY.method(action='step_to')
def step_to(thread: sch.Schema('Thread'), address: Address, max=None):
"""Continue execution up to the given address."""
find_thread_by_obj(thread)
return dbg().stepto(address.offset, max)
# TODO: The address may need mapping.
util.dbg.run_async(lambda: dbg().stepto(address.offset, max))
@REGISTRY.method(action='break_sw_execute')
@util.dbg.eng_thread
def break_address(process: sch.Schema('Process'), address: Address):
"""Set a breakpoint."""
find_proc_by_obj(process)
@ -417,6 +439,7 @@ def break_address(process: sch.Schema('Process'), address: Address):
@REGISTRY.method(action='break_sw_execute')
@util.dbg.eng_thread
def break_expression(expression: str):
"""Set a breakpoint."""
# TODO: Escape?
@ -424,6 +447,7 @@ def break_expression(expression: str):
@REGISTRY.method(action='break_hw_execute')
@util.dbg.eng_thread
def break_hw_address(process: sch.Schema('Process'), address: Address):
"""Set a hardware-assisted breakpoint."""
find_proc_by_obj(process)
@ -431,12 +455,14 @@ def break_hw_address(process: sch.Schema('Process'), address: Address):
@REGISTRY.method(action='break_hw_execute')
@util.dbg.eng_thread
def break_hw_expression(expression: str):
"""Set a hardware-assisted breakpoint."""
dbg().ba(expr=expression)
@REGISTRY.method(action='break_read')
@util.dbg.eng_thread
def break_read_range(process: sch.Schema('Process'), range: AddressRange):
"""Set a read watchpoint."""
find_proc_by_obj(process)
@ -444,12 +470,14 @@ def break_read_range(process: sch.Schema('Process'), range: AddressRange):
@REGISTRY.method(action='break_read')
@util.dbg.eng_thread
def break_read_expression(expression: str):
"""Set a read watchpoint."""
dbg().ba(expr=expression, access=DbgEng.DEBUG_BREAK_READ)
@REGISTRY.method(action='break_write')
@util.dbg.eng_thread
def break_write_range(process: sch.Schema('Process'), range: AddressRange):
"""Set a watchpoint."""
find_proc_by_obj(process)
@ -457,25 +485,30 @@ def break_write_range(process: sch.Schema('Process'), range: AddressRange):
@REGISTRY.method(action='break_write')
@util.dbg.eng_thread
def break_write_expression(expression: str):
"""Set a watchpoint."""
dbg().ba(expr=expression, access=DbgEng.DEBUG_BREAK_WRITE)
@REGISTRY.method(action='break_access')
@util.dbg.eng_thread
def break_access_range(process: sch.Schema('Process'), range: AddressRange):
"""Set an access watchpoint."""
find_proc_by_obj(process)
dbg().ba(expr=range.min, size=range.length(), access=DbgEng.DEBUG_BREAK_READ|DbgEng.DEBUG_BREAK_WRITE)
dbg().ba(expr=range.min, size=range.length(),
access=DbgEng.DEBUG_BREAK_READ | DbgEng.DEBUG_BREAK_WRITE)
@REGISTRY.method(action='break_access')
@util.dbg.eng_thread
def break_access_expression(expression: str):
"""Set an access watchpoint."""
dbg().ba(expr=expression, access=DbgEng.DEBUG_BREAK_READ|DbgEng.DEBUG_BREAK_WRITE)
dbg().ba(expr=expression, access=DbgEng.DEBUG_BREAK_READ | DbgEng.DEBUG_BREAK_WRITE)
@REGISTRY.method(action='toggle')
@util.dbg.eng_thread
def toggle_breakpoint(breakpoint: sch.Schema('BreakpointSpec'), enabled: bool):
"""Toggle a breakpoint."""
bpt = find_bpt_by_obj(breakpoint)
@ -486,6 +519,7 @@ def toggle_breakpoint(breakpoint: sch.Schema('BreakpointSpec'), enabled: bool):
@REGISTRY.method(action='delete')
@util.dbg.eng_thread
def delete_breakpoint(breakpoint: sch.Schema('BreakpointSpec')):
"""Delete a breakpoint."""
bpt = find_bpt_by_obj(breakpoint)
@ -495,14 +529,20 @@ def delete_breakpoint(breakpoint: sch.Schema('BreakpointSpec')):
@REGISTRY.method
def read_mem(process: sch.Schema('Process'), range: AddressRange):
"""Read memory."""
# print("READ_MEM: process={}, range={}".format(process, range))
nproc = find_proc_by_obj(process)
offset_start = process.trace.memory_mapper.map_back(
nproc, Address(range.space, range.min))
with commands.open_tracked_tx('Read Memory'):
dbg().read(range.min, range.length())
result = commands.put_bytes(
offset_start, offset_start + range.length() - 1, pages=True, display_result=False)
if result['count'] == 0:
commands.putmem_state(
offset_start, offset_start+range.length() - 1, 'error')
@REGISTRY.method
@util.dbg.eng_thread
def write_mem(process: sch.Schema('Process'), address: Address, data: bytes):
"""Write memory."""
nproc = find_proc_by_obj(process)
@ -511,12 +551,13 @@ def write_mem(process: sch.Schema('Process'), address: Address, data: bytes):
@REGISTRY.method
@util.dbg.eng_thread
def write_reg(frame: sch.Schema('StackFrame'), name: str, value: bytes):
"""Write a register."""
util.select_frame()
nproc = pydbg.selected_process()
dbg().reg._set_register(name, value)
def dbg():
return util.get_debugger()
return util.dbg._base

View file

@ -96,7 +96,8 @@
<attribute name="_spec" schema="BreakpointSpec" />
<attribute name="_range" schema="RANGE" hidden="yes" />
<attribute name="_modified" schema="BOOL" hidden="yes" />
<attribute name="_enabled" schema="BOOL" required="yes" hidden="yes" />
<attribute name="Enabled" schema="BOOL" required="yes" />
<attribute-alias from="_enabled" to="Enabled" />
<attribute name="Commands" schema="STRING" />
<attribute name="Condition" schema="STRING" />
<attribute name="Hit Count" schema="INT" />

View file

@ -14,49 +14,397 @@
# limitations under the License.
##
from collections import namedtuple
from concurrent.futures import Future
import concurrent.futures
from ctypes import *
import functools
import io
import os
import queue
import re
import sys
import threading
import traceback
from ctypes import *
from pybag import pydbg
from comtypes import CoClass, GUID
import comtypes
from comtypes.hresult import S_OK
from pybag import pydbg, userdbg, kerneldbg, crashdbg
from pybag.dbgeng import core as DbgEng
from pybag.dbgeng import exception
from pybag.dbgeng import util as DbgUtil
base = pydbg.DebuggerBase()
DbgVersion = namedtuple('DbgVersion', ['full', 'major', 'minor'])
from pybag.dbgeng.callbacks import DbgEngCallbacks
def _compute_pydbg_ver():
blurb = "" #base._control.GetActualProcessorType()
full = ""
major = 0
minor = 0
return DbgVersion(full, int(major), int(minor))
DbgVersion = namedtuple('DbgVersion', ['full', 'name', 'dotted', 'arch'])
DBG_VERSION = _compute_pydbg_ver()
class StdInputCallbacks(CoClass):
# This is the UUID listed for IDebugInputCallbacks in DbgEng.h
# See https://github.com/tpn/winsdk-10/blob/master/Include/10.0.10240.0/um/DbgEng.h
# Accessed 9 Jan 2024
_reg_clsid_ = GUID("{9f50e42c-f136-499e-9a97-73036c94ed2d}")
_reg_threading_ = "Both"
_reg_progid_ = "dbgeng.DbgEngInputCallbacks.1"
_reg_novers_progid_ = "dbgeng.DbgEngInputCallbacks"
_reg_desc_ = "InputCallbacks"
_reg_clsctx_ = comtypes.CLSCTX_INPROC_SERVER
_com_interfaces_ = [DbgEng.IDebugInputCallbacks,
comtypes.typeinfo.IProvideClassInfo2,
comtypes.errorinfo.ISupportErrorInfo,
comtypes.connectionpoints.IConnectionPointContainer]
def __init__(self, ghidra_dbg):
self.ghidra_dbg = ghidra_dbg
self.expecting_input = False
def IDebugInputCallbacks_StartInput(self, buffer_size):
try:
self.expecting_input = True
self.buffer_size = buffer_size
print('Input>', end=' ')
line = input()
self.ghidra_dbg.return_input(line)
return S_OK
except:
traceback.print_exc()
raise
def IDebugInputCallbacks_EndInput(self):
self.expecting_input = False
def get_debugger():
return base
class _Worker(threading.Thread):
def __init__(self, new_base, work_queue, dispatch):
super().__init__(name='DbgWorker', daemon=True)
self.new_base = new_base
self.work_queue = work_queue
self.dispatch = dispatch
def run(self):
self.new_base()
while True:
try:
work_item = self.work_queue.get_nowait()
except queue.Empty:
work_item = None
if work_item is None:
# HACK to avoid lockup on race condition
try:
self.dispatch(100)
except exception.DbgEngTimeout:
# This is routine :/
pass
else:
work_item.run()
# Derived from Python core library
# https://github.com/python/cpython/blob/main/Lib/concurrent/futures/thread.py
# accessed 9 Jan 2024
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as exc:
self.future.set_exception(exc)
# Python core lib does this, I presume for good reason
self = None
else:
self.future.set_result(result)
class DebuggeeRunningException(BaseException):
pass
class DbgExecutor(object):
def __init__(self, ghidra_dbg):
self._ghidra_dbg = ghidra_dbg
self._work_queue = queue.SimpleQueue()
self._thread = _Worker(ghidra_dbg._new_base,
self._work_queue, ghidra_dbg._dispatch_events)
self._thread.start()
self._executing = False
def submit(self, fn, / , *args, **kwargs):
f = self._submit_no_exit(fn, *args, **kwargs)
self._ghidra_dbg.exit_dispatch()
return f
def _submit_no_exit(self, fn, / , *args, **kwargs):
f = Future()
if self._executing:
f.set_exception(DebuggeeRunningException("Debuggee is Running"))
return f
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
return f
def _clear_queue(self):
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
return
work_item.future.set_exception(
DebuggeeRunningException("Debuggee is Running"))
def _state_execute(self):
self._executing = True
self._clear_queue()
def _state_break(self):
self._executing = False
class WrongThreadException(BaseException):
pass
class AllDbg(pydbg.DebuggerBase):
# Steal user-mode methods
proc_list = userdbg.UserDbg.proc_list
ps = userdbg.UserDbg.ps
pids_by_name = userdbg.UserDbg.pids_by_name
create_proc = userdbg.UserDbg.create
attach_proc = userdbg.UserDbg.attach
reattach_proc = userdbg.UserDbg.reattach
detach_proc = userdbg.UserDbg.detach
abandon_proc = userdbg.UserDbg.abandon
terminate_proc = userdbg.UserDbg.terminate
handoff_proc = userdbg.UserDbg.handoff
connect_proc = userdbg.UserDbg.connect
disconnect_proc = userdbg.UserDbg.disconnect
# Steal kernel-mode methods
attach_kernel = kerneldbg.KernelDbg.attach
detach_kernel = kerneldbg.KernelDbg.detach
# Steal crash methods
load_dump = crashdbg.CrashDbg.load_dump
class GhidraDbg(object):
def __init__(self):
self._queue = DbgExecutor(self)
self._thread = self._queue._thread
# Wait for the executor to be operational before getting base
self._queue._submit_no_exit(lambda: None).result()
self._install_stdin()
base = self._protected_base
for name in ['set_output_mask', 'get_output_mask',
'exec_status', 'go', 'goto', 'go_handled', 'go_nothandled',
'stepi', 'stepo', 'stepbr', 'stepto', 'stepout',
'trace', 'traceto',
'wait',
'bitness',
'read', 'readstr', 'readptr', 'poi',
'write', 'writeptr',
'memory_list', 'address',
'instruction_at', 'disasm',
'pc', 'r', 'registers',
'get_name_by_offset', 'symbol', 'find_symbol',
'whereami',
'dd', 'dp', 'ds',
'bl', 'bc', 'bd', 'be', 'bp', 'ba',
'handle_list', 'handles',
'get_thread', 'set_thread', 'apply_threads', 'thread_list', 'threads',
'teb_addr', 'teb', 'peb_addr', 'peb',
'backtrace_list', 'backtrace',
'module_list', 'lm', 'exports', 'imports',
# User-mode
'proc_list', 'ps', 'pids_by_name',
'create_proc', 'attach_proc', 'reattach_proc',
'detach_proc', 'abandon_proc', 'terminate_proc', 'handoff_proc',
'connect_proc', 'disconnect_proc',
# Kernel-model
'attach_kernel', 'detach_kernel',
# Crash dump
'load_dump'
]:
setattr(self, name, self.eng_thread(getattr(base, name)))
def _new_base(self):
self._protected_base = AllDbg()
@property
def _base(self):
if threading.current_thread() is not self._thread:
raise WrongThreadException("Was {}. Want {}".format(
threading.current_thread(), self._thread))
return self._protected_base
def run(self, fn, *args, **kwargs):
# TODO: Remove this check?
if hasattr(self, '_thread') and threading.current_thread() is self._thread:
raise WrongThreadException()
future = self._queue.submit(fn, *args, **kwargs)
# https://stackoverflow.com/questions/72621731/is-there-any-graceful-way-to-interrupt-a-python-concurrent-future-result-call gives an alternative
while True:
try:
return future.result(0.5)
except concurrent.futures.TimeoutError:
pass
def run_async(self, fn, *args, **kwargs):
return self._queue.submit(fn, *args, **kwargs)
@staticmethod
def check_thread(func):
'''
For methods inside of GhidraDbg, ensure it runs on the dbgeng
thread
'''
@functools.wraps(func)
def _func(self, *args, **kwargs):
if threading.current_thread() is self._thread:
return func(self, *args, **kwargs)
else:
return self.run(func, self, *args, **kwargs)
return _func
def eng_thread(self, func):
'''
For methods and functions outside of GhidraDbg, ensure it
runs on this GhidraDbg's dbgeng thread
'''
@functools.wraps(func)
def _func(*args, **kwargs):
if threading.current_thread() is self._thread:
return func(*args, **kwargs)
else:
return self.run(func, *args, **kwargs)
return _func
def _ces_exec_status(self, argument):
if argument & 0xfffffff == DbgEng.DEBUG_STATUS_BREAK:
self._queue._state_break()
else:
self._queue._state_execute()
@check_thread
def _install_stdin(self):
self.input = StdInputCallbacks(self)
self._base._client.SetInputCallbacks(self.input)
# Manually decorated to preserve undecorated
def _dispatch_events(self, timeout=DbgEng.WAIT_INFINITE):
# NB: pybag's impl doesn't heed standalone
self._protected_base._client.DispatchCallbacks(timeout)
dispatch_events = check_thread(_dispatch_events)
# no check_thread. Must allow reentry
def exit_dispatch(self):
self._protected_base._client.ExitDispatch()
@check_thread
def cmd(self, cmdline, quiet=True):
# NB: pybag's impl always captures.
# Here, we let it print without capture if quiet is False
if quiet:
try:
buffer = io.StringIO()
self._base.callbacks.stdout = buffer
self._base._control.Execute(cmdline)
buffer.seek(0)
return buffer.read()
finally:
self._base.callbacks.reset_stdout()
else:
return self._base._control.Execute(cmdline)
@check_thread
def return_input(self, input):
# TODO: Contribute fix upstream (check_hr -> check_err)
# return self._base._control.ReturnInput(input.encode())
hr = self._base._control._ctrl.ReturnInput(input.encode())
exception.check_err(hr)
def interrupt(self):
# Contribute upstream?
# NOTE: This can be called from any thread
self._protected_base._control.SetInterrupt(
DbgEng.DEBUG_INTERRUPT_ACTIVE)
@check_thread
def get_current_system_id(self):
# TODO: upstream?
sys_id = c_ulong()
hr = self._base._systems._sys.GetCurrentSystemId(byref(sys_id))
exception.check_err(hr)
return sys_id.value
@check_thread
def get_prompt_text(self):
# TODO: upstream?
size = c_ulong()
hr = self._base._control._ctrl.GetPromptText(None, 0, byref(size))
prompt_buf = create_string_buffer(size.value)
hr = self._base._control._ctrl.GetPromptText(prompt_buf, size, None)
return prompt_buf.value.decode()
@check_thread
def get_actual_processor_type(self):
return self._base._control.GetActualProcessorType()
@property
@check_thread
def pid(self):
try:
return self._base._systems.GetCurrentProcessSystemId()
except exception.E_UNEXPECTED_Error:
# There is no process
return None
dbg = GhidraDbg()
@dbg.eng_thread
def compute_pydbg_ver():
pat = re.compile(
'(?P<name>.*Debugger.*) Version (?P<dotted>[\\d\\.]*) (?P<arch>\\w*)')
blurb = dbg.cmd('version')
matches = [pat.match(l) for l in blurb.splitlines() if pat.match(l)]
if len(matches) == 0:
return DbgVersion('Unknown', 'Unknown', '0', 'Unknown')
m = matches[0]
return DbgVersion(full=m.group(), **m.groupdict())
name, dotted_and_arch = full.split(' Version ')
DBG_VERSION = compute_pydbg_ver()
def get_target():
return 0 #get_debugger()._systems.GetCurrentSystemId()
return dbg.get_current_system_id()
@dbg.eng_thread
def disassemble1(addr):
return DbgUtil.disassemble_instruction(dbg._base.bitness(), addr, dbg.read(addr, 15))
def get_inst(addr):
dbg = get_debugger()
ins = DbgUtil.disassemble_instruction(dbg.bitness(), addr, dbg.read(addr, 15))
return str(ins)
return str(disassemble1(addr))
def get_inst_sz(addr):
dbg = get_debugger()
ins = DbgUtil.disassemble_instruction(dbg.bitness(), addr, dbg.read(addr, 15))
return str(ins.size)
return int(disassemble1(addr).size)
@dbg.eng_thread
def get_breakpoints():
ids = [bpid for bpid in get_debugger().breakpoints]
ids = [bpid for bpid in dbg._base.breakpoints]
offset_set = []
expr_set = []
prot_set = []
@ -64,30 +412,30 @@ def get_breakpoints():
stat_set = []
for bpid in ids:
try:
bp = get_debugger()._control.GetBreakpointById(bpid)
bp = dbg._base._control.GetBreakpointById(bpid)
except exception.E_NOINTERFACE_Error:
continue
if bp.GetFlags() & DbgEng.DEBUG_BREAKPOINT_DEFERRED:
offset = "[Deferred]"
expr = bp.GetOffsetExpression()
else:
offset = "%016x" % bp.GetOffset()
expr = get_debugger().get_name_by_offset(bp.GetOffset())
expr = dbg._base.get_name_by_offset(bp.GetOffset())
if bp.GetType()[0] == DbgEng.DEBUG_BREAKPOINT_DATA:
width, prot = bp.GetDataParameters()
width = ' sz={}'.format(str(width))
prot = {4: 'type=x', 3: 'type=rw', 2: 'type=w', 1: 'type=r'}[prot]
prot = {4: 'type=x', 3: 'type=rw', 2: 'type=w', 1: 'type=r'}[prot]
else:
width = ''
prot = ''
prot = ''
if bp.GetFlags() & DbgEng.DEBUG_BREAKPOINT_ENABLED:
status = 'enabled'
else:
status = 'disabled'
offset_set.append(offset)
expr_set.append(expr)
prot_set.append(prot)
@ -95,48 +443,65 @@ def get_breakpoints():
stat_set.append(status)
return zip(offset_set, expr_set, prot_set, width_set, stat_set)
@dbg.eng_thread
def selected_process():
try:
return get_debugger()._systems.GetCurrentProcessId()
#return current_process
except Exception:
return dbg._base._systems.GetCurrentProcessId()
except exception.E_UNEXPECTED_Error:
return None
def selected_thread():
@dbg.eng_thread
def selected_thread():
try:
return get_debugger()._systems.GetCurrentThreadId()
except Exception:
return dbg._base._systems.GetCurrentThreadId()
except exception.E_UNEXPECTED_Error:
return None
@dbg.eng_thread
def selected_frame():
return 0 #selected_thread().GetSelectedFrame()
try:
line = dbg.cmd('.frame').strip()
if not line:
return None
num_str = line.split(sep=None, maxsplit=1)[0]
return int(num_str, 16)
except OSError:
return None
except ValueError:
return None
@dbg.eng_thread
def select_process(id: int):
return get_debugger()._systems.SetCurrentProcessId(id)
return dbg._base._systems.SetCurrentProcessId(id)
@dbg.eng_thread
def select_thread(id: int):
return get_debugger()._systems.SetCurrentThreadId(id)
return dbg._base._systems.SetCurrentThreadId(id)
@dbg.eng_thread
def select_frame(id: int):
#TODO: this needs to be fixed
return id
return dbg.cmd('.frame 0x{:x}'.format(id))
def parse_and_eval(expr):
regs = get_debugger().reg
if expr == "$pc":
return regs.get_pc()
if expr == "$sp":
return regs.get_sp()
return get_eval(expr)
def get_eval(expr, type=None):
ctrl = get_debugger()._control._ctrl
@dbg.eng_thread
def parse_and_eval(expr, type=None):
if isinstance(expr, int):
return expr
# TODO: This could be contributed upstream
ctrl = dbg._base._control._ctrl
ctrl.SetExpressionSyntax(1)
value = DbgEng._DEBUG_VALUE()
index = c_ulong()
if type == None:
type = DbgEng.DEBUG_VALUE_INT64
hr = ctrl.Evaluate(Expression="{}".format(expr).encode(),DesiredType=type,Value=byref(value),RemainderIndex=byref(index))
hr = ctrl.Evaluate(Expression=expr.encode(), DesiredType=type,
Value=byref(value), RemainderIndex=byref(index))
exception.check_err(hr)
if type == DbgEng.DEBUG_VALUE_INT8:
return value.u.I8
@ -157,96 +522,128 @@ def get_eval(expr, type=None):
if type == DbgEng.DEBUG_VALUE_FLOAT128:
return value.u.F128Bytes
@dbg.eng_thread
def get_pc():
return dbg._base.reg.get_pc()
@dbg.eng_thread
def get_sp():
return dbg._base.reg.get_sp()
@dbg.eng_thread
def GetProcessIdsByIndex(count=0):
# TODO: This could be contributed upstream?
if count == 0:
try :
count = get_debugger()._systems.GetNumberProcesses()
try:
count = dbg._base._systems.GetNumberProcesses()
except Exception:
count = 0
ids = (c_ulong * count)()
sysids = (c_ulong * count)()
if count != 0:
hr = get_debugger()._systems._sys.GetProcessIdsByIndex(0, count, ids, sysids)
hr = dbg._base._systems._sys.GetProcessIdsByIndex(
0, count, ids, sysids)
exception.check_err(hr)
return (tuple(ids), tuple(sysids))
@dbg.eng_thread
def GetCurrentProcessExecutableName():
dbg = get_debugger()
# TODO: upstream?
_dbg = dbg._base
size = c_ulong()
exesize = c_ulong()
hr = dbg._systems._sys.GetCurrentProcessExecutableName(None, size, byref(exesize))
hr = _dbg._systems._sys.GetCurrentProcessExecutableName(
None, size, byref(exesize))
exception.check_err(hr)
buffer = create_string_buffer(exesize.value)
size = exesize
hr = dbg._systems._sys.GetCurrentProcessExecutableName(buffer, size, None)
hr = _dbg._systems._sys.GetCurrentProcessExecutableName(buffer, size, None)
exception.check_err(hr)
buffer = buffer[:size.value]
buffer = buffer.rstrip(b'\x00')
return buffer
@dbg.eng_thread
def GetCurrentProcessPeb():
dbg = get_debugger()
# TODO: upstream?
_dbg = dbg._base
offset = c_ulonglong()
hr = dbg._systems._sys.GetCurrentProcessPeb(byref(offset))
hr = _dbg._systems._sys.GetCurrentProcessPeb(byref(offset))
exception.check_err(hr)
return offset.value
@dbg.eng_thread
def GetExitCode():
# TODO: upstream?
exit_code = c_ulong()
hr = get_debugger()._client._cli.GetExitCode(byref(exit_code))
hr = dbg._base._client._cli.GetExitCode(byref(exit_code))
return exit_code.value
@dbg.eng_thread
def process_list(running=False):
"""process_list() -> list of all processes"""
dbg = get_debugger()
"""Get the list of all processes"""
_dbg = dbg._base
ids, sysids = GetProcessIdsByIndex()
pebs = []
names = []
try :
curid = dbg._systems.GetCurrentProcessId()
if running == False:
curid = selected_process()
try:
if running:
return zip(sysids)
else:
for id in ids:
dbg._systems.SetCurrentProcessId(id)
_dbg._systems.SetCurrentProcessId(id)
names.append(GetCurrentProcessExecutableName())
pebs.append(GetCurrentProcessPeb())
if running == False:
dbg._systems.SetCurrentProcessId(curid)
return zip(sysids, names, pebs)
except Exception:
pass
return zip(sysids)
return zip(sysids)
finally:
if not running and curid is not None:
_dbg._systems.SetCurrentProcessId(curid)
@dbg.eng_thread
def thread_list(running=False):
"""thread_list() -> list of all threads"""
dbg = get_debugger()
try :
ids, sysids = dbg._systems.GetThreadIdsByIndex()
"""Get the list of all threads"""
_dbg = dbg._base
try:
ids, sysids = _dbg._systems.GetThreadIdsByIndex()
except Exception:
return zip([])
tebs = []
syms = []
curid = dbg._systems.GetCurrentThreadId()
if running == False:
for id in ids:
dbg._systems.SetCurrentThreadId(id)
tebs.append(dbg._systems.GetCurrentThreadTeb())
addr = dbg.reg.get_pc()
syms.append(dbg.get_name_by_offset(addr))
if running == False:
dbg._systems.SetCurrentThreadId(curid)
return zip(sysids, tebs, syms)
return zip(sysids)
curid = selected_thread()
try:
if running:
return zip(sysids)
else:
for id in ids:
_dbg._systems.SetCurrentThreadId(id)
tebs.append(_dbg._systems.GetCurrentThreadTeb())
addr = _dbg.reg.get_pc()
syms.append(_dbg.get_name_by_offset(addr))
return zip(sysids, tebs, syms)
except Exception:
return zip(sysids)
finally:
if not running and curid is not None:
_dbg._systems.SetCurrentThreadId(curid)
conv_map = {}
def get_convenience_variable(id):
#val = get_target().GetEnvironment().Get(id)
if id not in conv_map:
return "auto"
val = conv_map[id]
@ -254,7 +651,6 @@ def get_convenience_variable(id):
return "auto"
return val
def set_convenience_variable(id, value):
#env = get_target().GetEnvironment()
#return env.Set(id, value, True)
conv_map[id] = value