GP-5511: final review set

GP-5511: post-re-review
GP-5511: fix for require_tx bug
GP-5511: minor fixes
GP-5511: fixups using vscode - something broken
GP-5511: finsihing up
GP-5511: most things covered
GP-5511: commands (first pass)
GP-5511: arch
This commit is contained in:
d-millar 2025-03-26 15:32:11 -04:00
parent 8fcda62c5b
commit 5d4b56b7e4
6 changed files with 526 additions and 554 deletions

View file

@ -291,7 +291,7 @@ register_mappers = {
}
def compute_register_mapper(lang):
def compute_register_mapper(lang: str)-> DefaultRegisterMapper:
if not lang in register_mappers:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER

View file

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from typing import Dict, List, Literal, Optional, Tuple
from ghidratrace.client import Address, RegVal
import drgn
@ -20,7 +22,7 @@ from . import util
# NOTE: This map is derived from the ldefs using a script
language_map = {
language_map: Dict[str, List[str]] = {
'AARCH64': ['AARCH64:BE:64:v8A', 'AARCH64:LE:64:AppleSilicon', 'AARCH64:LE:64:v8A'],
'ARM': ['ARM:BE:32:v8', 'ARM:BE:32:v8T', 'ARM:LE:32:v8', 'ARM:LE:32:v8T'],
'PPC64': ['PowerPC:BE:64:4xx', 'PowerPC:LE:64:4xx'],
@ -31,19 +33,19 @@ language_map = {
'UNKNOWN': ['DATA:LE:64:default', 'DATA:LE:64:default'],
}
data64_compiler_map = {
data64_compiler_map: Dict[Optional[str], str] = {
None: 'pointer64',
}
default_compiler_map = {
default_compiler_map: Dict[Optional[str], str] = {
'Language.C': 'default',
}
x86_compiler_map = {
x86_compiler_map: Dict[Optional[str], str] = {
'Language.C': 'gcc',
}
compiler_map = {
compiler_map: Dict[str, Dict[Optional[str], str]] = {
'DATA:BE:64:': data64_compiler_map,
'DATA:LE:64:': data64_compiler_map,
'x86:LE:32:': x86_compiler_map,
@ -56,12 +58,12 @@ compiler_map = {
}
def get_arch():
def get_arch() -> str:
platform = drgn.host_platform
return platform.arch.name
def get_endian():
def get_endian() -> Literal['little', 'big']:
parm = util.get_convenience_variable('endian')
if parm != 'auto':
return parm
@ -73,7 +75,7 @@ def get_endian():
return 'big'
def get_size():
def get_size() -> str:
parm = util.get_convenience_variable('size')
if parm != 'auto':
return parm
@ -85,11 +87,11 @@ def get_size():
return '32'
def get_osabi():
def get_osabi() -> str:
return "Language.C"
def compute_ghidra_language():
def compute_ghidra_language() -> str:
# First, check if the parameter is set
lang = util.get_convenience_variable('ghidra-language')
if lang != 'auto':
@ -103,7 +105,7 @@ def compute_ghidra_language():
sz = get_size()
lebe = ':BE:' if endian == 'big' else ':LE:'
if not arch in language_map:
return 'DATA' + lebe + sz +':default'
return 'DATA' + lebe + sz + ':default'
langs = language_map[arch]
matched_endian = sorted(
(l for l in langs if lebe in l),
@ -115,7 +117,7 @@ def compute_ghidra_language():
return 'DATA' + lebe + sz + ':default'
def compute_ghidra_compiler(lang):
def compute_ghidra_compiler(lang: str) -> str:
# First, check if the parameter is set
comp = util.get_convenience_variable('ghidra-compiler')
if comp != 'auto':
@ -124,12 +126,12 @@ def compute_ghidra_compiler(lang):
# Check if the selected lang has specific compiler recommendations
matched_lang = sorted(
(l for l in compiler_map if l in lang),
# key=lambda l: compiler_map[l]
# key=lambda l: compiler_map[l]
)
if len(matched_lang) == 0:
print(f"{lang} not found in compiler map - using default compiler")
return 'default'
comp_map = compiler_map[matched_lang[0]]
if comp_map == data64_compiler_map:
print(f"Using the DATA64 compiler map")
@ -145,7 +147,7 @@ def compute_ghidra_compiler(lang):
return 'default'
def compute_ghidra_lcsp():
def compute_ghidra_lcsp() -> Tuple[str, str]:
lang = compute_ghidra_language()
comp = compute_ghidra_compiler(lang)
return lang, comp
@ -153,14 +155,14 @@ def compute_ghidra_lcsp():
class DefaultMemoryMapper(object):
def __init__(self, defaultSpace):
def __init__(self, defaultSpace: str) -> None:
self.defaultSpace = defaultSpace
def map(self, proc: drgn.Program, offset: int):
def map(self, proc: int, offset: int) -> Tuple[str, Address]:
space = self.defaultSpace
return self.defaultSpace, Address(space, offset)
def map_back(self, proc: drgn.Program, address: Address) -> int:
def map_back(self, proc: int, address: Address) -> int:
if address.space == self.defaultSpace:
return address.offset
raise ValueError(
@ -169,10 +171,10 @@ class DefaultMemoryMapper(object):
DEFAULT_MEMORY_MAPPER = DefaultMemoryMapper('ram')
memory_mappers = {}
memory_mappers: Dict[str, DefaultMemoryMapper] = {}
def compute_memory_mapper(lang):
def compute_memory_mapper(lang: str) -> DefaultMemoryMapper:
if not lang in memory_mappers:
return DEFAULT_MEMORY_MAPPER
return memory_mappers[lang]
@ -180,29 +182,29 @@ def compute_memory_mapper(lang):
class DefaultRegisterMapper(object):
def __init__(self, byte_order):
def __init__(self, byte_order: str) -> None:
if not byte_order in ['big', 'little']:
raise ValueError("Invalid byte_order: {}".format(byte_order))
self.byte_order = byte_order
self.union_winners = {}
def map_name(self, proc, name):
def map_name(self, proc: int, name: str) -> str:
return name
def map_value(self, proc, name, value):
def map_value(self, proc: int, name: str, value: bytes):
return RegVal(self.map_name(proc, name), value)
def map_name_back(self, proc, name):
def map_name_back(self, proc: int, name: str):
return name
def map_value_back(self, proc, name, value):
def map_value_back(self, proc: int, name: str, value: bytes):
return RegVal(self.map_name_back(proc, name), value)
DEFAULT_BE_REGISTER_MAPPER = DefaultRegisterMapper('big')
DEFAULT_LE_REGISTER_MAPPER = DefaultRegisterMapper('little')
def compute_register_mapper(lang):
def compute_register_mapper(lang: str) -> DefaultRegisterMapper:
if ':BE:' in lang:
return DEFAULT_BE_REGISTER_MAPPER
else:

View file

@ -13,45 +13,43 @@
# See the License for the specific language governing permissions and
# limitations under the License.
##
from dataclasses import dataclass, field
import threading
import time
import drgn
from typing import Any, Callable, Collection, Dict, Optional, TypeVar, cast
from . import commands, util
ALL_EVENTS = 0xFFFF
@dataclass(frozen=False)
class HookState(object):
__slots__ = ('installed', 'mem_catchpoint')
def __init__(self):
self.installed = False
self.mem_catchpoint = None
installed = False
@dataclass(frozen=False)
class ProcessState(object):
__slots__ = ('first', 'regions', 'modules', 'threads',
'breaks', 'watches', 'visited')
first = True
# For things we can detect changes to between stops
regions = False
modules = False
threads = False
breaks = False
watches = False
# For frames and threads that have already been synced since last stop
visited: set[Any] = field(default_factory=set)
def __init__(self):
self.first = True
# For things we can detect changes to between stops
self.regions = False
self.modules = False
self.threads = False
self.breaks = False
self.watches = False
# For frames and threads that have already been synced since last stop
self.visited = set()
def record(self, description=None):
def record(self, description: Optional[str] = None) -> None:
first = self.first
self.first = False
trace = commands.STATE.require_trace()
if description is not None:
commands.STATE.trace.snapshot(description)
trace.snapshot(description)
if first:
commands.put_processes()
commands.put_environment()
@ -84,51 +82,53 @@ class ProcessState(object):
commands.put_modules()
self.modules = False
def record_continued(self):
def record_continued(self) -> None:
commands.put_processes()
commands.put_threads()
def record_exited(self, exit_code):
def record_exited(self, exit_code: int) -> None:
trace = commands.STATE.require_trace()
nproc = util.selected_process()
ipath = commands.PROCESS_PATTERN.format(procnum=nproc)
procobj = commands.STATE.trace.proxy_object_path(ipath)
procobj = trace.proxy_object_path(ipath)
procobj.set_value('Exit Code', exit_code)
procobj.set_value('State', 'TERMINATED')
HOOK_STATE = HookState()
PROC_STATE = {}
PROC_STATE: Dict[int, ProcessState] = {}
def on_new_process(event):
def on_new_process(id: int) -> None:
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("New Process {}".format(event.process.num)):
with trace.client.batch():
with trace.open_tx("New Process {}".format(id)):
commands.put_processes() # TODO: Could put just the one....
def on_process_selected():
def on_process_selected() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Process {} selected".format(nproc)):
PROC_STATE[nproc].record()
commands.activate()
def on_new_thread(event):
def on_new_thread() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
PROC_STATE[nproc].threads = True
def on_thread_selected():
def on_thread_selected() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
@ -136,14 +136,14 @@ def on_thread_selected():
if trace is None:
return
nthrd = util.selected_thread()
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Thread {}.{} selected".format(nproc, nthrd)):
PROC_STATE[nproc].record()
commands.put_threads()
commands.activate()
def on_frame_selected():
def on_frame_selected() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
@ -152,7 +152,7 @@ def on_frame_selected():
return
nthrd = util.selected_thread()
level = util.selected_frame()
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Frame {}.{}.{} selected".format(nproc, nthrd, level)):
PROC_STATE[nproc].record()
commands.put_threads()
@ -160,32 +160,7 @@ def on_frame_selected():
commands.activate()
def on_memory_changed(event):
nproc = util.get_process()
if nproc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Memory *0x{:08x} changed".format(event.address)):
commands.put_bytes(event.address, event.address + event.length,
pages=False, is_mi=False, result=None)
def on_register_changed(event):
nproc = util.get_process()
if nproc not in PROC_STATE:
return
trace = commands.STATE.trace
if trace is None:
return
with commands.STATE.client.batch():
with trace.open_tx("Register {} changed".format(event.regnum)):
commands.putreg()
def on_cont(event):
def on_cont() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
@ -193,12 +168,12 @@ def on_cont(event):
if trace is None:
return
state = PROC_STATE[nproc]
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Continued"):
state.record_continued()
def on_stop(event):
def on_stop() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
PROC_STATE[nproc] = ProcessState()
@ -208,7 +183,7 @@ def on_stop(event):
return
state = PROC_STATE[nproc]
state.visited.clear()
with commands.STATE.client.batch():
with trace.client.batch():
with trace.open_tx("Stopped"):
state.record("Stopped")
commands.put_threads()
@ -216,34 +191,31 @@ def on_stop(event):
commands.activate()
def modules_changed():
def modules_changed() -> None:
nproc = util.selected_process()
if nproc not in PROC_STATE:
return
PROC_STATE[nproc].modules = True
def install_hooks():
def install_hooks() -> None:
if HOOK_STATE.installed:
return
HOOK_STATE.installed = True
event_thread = EventThread()
event_thread.start()
def remove_hooks():
def remove_hooks() -> None:
if not HOOK_STATE.installed:
return
HOOK_STATE.installed = False
def enable_current_process():
def enable_current_process() -> None:
nproc = util.selected_process()
PROC_STATE[nproc] = ProcessState()
def disable_current_process():
def disable_current_process() -> None:
nproc = util.selected_process()
if nproc in PROC_STATE:
del PROC_STATE[nproc]

View file

@ -19,10 +19,11 @@ from io import StringIO
import re
import sys
import time
from typing import Annotated, Any, Dict, Optional
from typing import Annotated, Any, Dict, Optional, Tuple
import drgn
import drgn.cli
from drgn import Module, StackFrame # type: ignore
from ghidratrace import sch
from ghidratrace.client import (
@ -35,159 +36,22 @@ REGISTRY = MethodRegistry(ThreadPoolExecutor(
max_workers=1, thread_name_prefix='MethodRegistry'))
def extre(base, ext):
def extre(base, ext) -> re.Pattern:
return re.compile(base.pattern + ext)
PROCESSES_PATTERN = re.compile('Processes')
PROCESS_PATTERN = extre(PROCESSES_PATTERN, '\[(?P<procnum>\\d*)\]')
ENV_PATTERN = extre(PROCESS_PATTERN, '\.Environment')
THREADS_PATTERN = extre(PROCESS_PATTERN, '\.Threads')
THREAD_PATTERN = extre(THREADS_PATTERN, '\[(?P<tnum>\\d*)\]')
STACK_PATTERN = extre(THREAD_PATTERN, '\.Stack')
FRAME_PATTERN = extre(STACK_PATTERN, '\[(?P<level>\\d*)\]')
PROCESS_PATTERN = extre(PROCESSES_PATTERN, '\\[(?P<procnum>\\d*)\\]')
ENV_PATTERN = extre(PROCESS_PATTERN, '\\.Environment')
THREADS_PATTERN = extre(PROCESS_PATTERN, '\\.Threads')
THREAD_PATTERN = extre(THREADS_PATTERN, '\\[(?P<tnum>\\d*)\\]')
STACK_PATTERN = extre(THREAD_PATTERN, '\\.Stack')
FRAME_PATTERN = extre(STACK_PATTERN, '\\[(?P<level>\\d*)\\]')
REGS_PATTERN = extre(FRAME_PATTERN, '.Registers')
LOCALS_PATTERN = extre(FRAME_PATTERN, '.Locals')
MEMORY_PATTERN = extre(PROCESS_PATTERN, '\.Memory')
MODULES_PATTERN = extre(PROCESS_PATTERN, '\.Modules')
MODULE_PATTERN = extre(MODULES_PATTERN, '\[(?P<modbase>.*)\]')
def find_availpid_by_pattern(pattern, object, err_msg):
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pid = int(mat['pid'])
return pid
def find_availpid_by_obj(object):
return find_availpid_by_pattern(AVAILABLE_PATTERN, object, "an Available")
def find_proc_by_num(id):
if id != util.selected_process():
util.select_process(id)
return util.selected_process()
def find_proc_by_pattern(object, pattern, err_msg):
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
procnum = int(mat['procnum'])
return find_proc_by_num(procnum)
def find_proc_by_obj(object):
return find_proc_by_pattern(object, PROCESS_PATTERN, "an Process")
def find_proc_by_env_obj(object):
return find_proc_by_pattern(object, ENV_PATTERN, "an Environment")
def find_proc_by_threads_obj(object):
return find_proc_by_pattern(object, THREADS_PATTERN, "a ThreadContainer")
def find_proc_by_mem_obj(object):
return find_proc_by_pattern(object, MEMORY_PATTERN, "a Memory")
def find_proc_by_modules_obj(object):
return find_proc_by_pattern(object, MODULES_PATTERN, "a ModuleContainer")
def find_thread_by_num(id):
if id != util.selected_thread():
util.select_thread(id)
return util.selected_thread()
def find_thread_by_pattern(pattern, object, err_msg):
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
tnum = int(mat['tnum'])
find_proc_by_num(pnum)
return find_thread_by_num(tnum)
def find_thread_by_obj(object):
return find_thread_by_pattern(THREAD_PATTERN, object, "a Thread")
def find_thread_by_stack_obj(object):
return find_thread_by_pattern(STACK_PATTERN, object, "a Stack")
def find_thread_by_regs_obj(object):
return find_thread_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer")
def find_frame_by_level(level):
tnum = util.selected_thread()
thread = commands.prog.thread(tnum)
try:
frames = thread.stack_trace()
except Exception as e:
print(e)
return
for i, f in enumerate(frames):
if i == level:
if i != util.selected_frame():
util.select_frame(i)
return i, f
def find_frame_by_pattern(pattern, object, err_msg):
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
tnum = int(mat['tnum'])
level = int(mat['level'])
find_proc_by_num(pnum)
find_thread_by_num(tnum)
return find_frame_by_level(level)
def find_frame_by_obj(object):
return find_frame_by_pattern(FRAME_PATTERN, object, "a StackFrame")
def find_frame_by_regs_obj(object):
return find_frame_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer")
def find_frame_by_locals_obj(object):
return find_frame_by_pattern(LOCALS_PATTERN, object, "a LocalsContainer")
def find_module_by_base(modbase):
for m in commands.prog.modules():
if modbase == str(hex(m.address_range[0])):
return m
def find_module_by_pattern(pattern, object, err_msg):
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
modbase = mat['modbase']
find_proc_by_num(pnum)
return find_module_by_base(modbase)
def find_module_by_obj(object):
return find_module_by_pattern(MODULE_PATTERN, object, "a Module")
shared_globals: Dict[str, Any] = dict()
MEMORY_PATTERN = extre(PROCESS_PATTERN, '\\.Memory')
MODULES_PATTERN = extre(PROCESS_PATTERN, '\\.Modules')
MODULE_PATTERN = extre(MODULES_PATTERN, '\\[(?P<modbase>.*)\\]')
class Environment(TraceObject):
@ -222,10 +86,6 @@ class RegisterValueContainer(TraceObject):
pass
class StackFrame(TraceObject):
pass
class SymbolContainer(TraceObject):
pass
@ -238,6 +98,136 @@ class ThreadContainer(TraceObject):
pass
def find_proc_by_num(id: int) -> int:
if id != util.selected_process():
util.select_process(id)
return util.selected_process()
def find_proc_by_pattern(object: TraceObject, pattern: re.Pattern,
err_msg: str) -> int:
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
procnum = int(mat['procnum'])
return find_proc_by_num(procnum)
def find_proc_by_obj(object: TraceObject) -> int:
return find_proc_by_pattern(object, PROCESS_PATTERN, "an Process")
def find_proc_by_env_obj(object: TraceObject) -> int:
return find_proc_by_pattern(object, ENV_PATTERN, "an Environment")
def find_proc_by_threads_obj(object: TraceObject) -> int:
return find_proc_by_pattern(object, THREADS_PATTERN, "a ThreadContainer")
def find_proc_by_mem_obj(object: TraceObject) -> int:
return find_proc_by_pattern(object, MEMORY_PATTERN, "a Memory")
def find_proc_by_modules_obj(object: TraceObject) -> int:
return find_proc_by_pattern(object, MODULES_PATTERN, "a ModuleContainer")
def find_thread_by_num(id: int) -> Optional[int]:
if id != util.selected_thread():
util.select_thread(id)
return util.selected_thread()
def find_thread_by_pattern(pattern: re.Pattern, object: TraceObject,
err_msg: str) -> Optional[int]:
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
tnum = int(mat['tnum'])
find_proc_by_num(pnum)
return find_thread_by_num(tnum)
def find_thread_by_obj(object: TraceObject) -> Optional[int]:
return find_thread_by_pattern(THREAD_PATTERN, object, "a Thread")
def find_thread_by_stack_obj(object: TraceObject) -> Optional[int]:
return find_thread_by_pattern(STACK_PATTERN, object, "a Stack")
def find_thread_by_regs_obj(object: TraceObject) -> Optional[int]:
return find_thread_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer")
def find_frame_by_level(level: int) -> Optional[Tuple[int, StackFrame]]:
tnum = util.selected_thread()
thread = commands.prog.thread(tnum)
try:
frames = thread.stack_trace()
except Exception as e:
print(e)
return None
for i, f in enumerate(frames):
if i == level:
if i != util.selected_frame():
util.select_frame(i)
return i, f
return None
def find_frame_by_pattern(pattern: re.Pattern, object: TraceObject,
err_msg: str) -> Optional[Tuple[int, StackFrame]]:
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
tnum = int(mat['tnum'])
level = int(mat['level'])
find_proc_by_num(pnum)
find_thread_by_num(tnum)
return find_frame_by_level(level)
def find_frame_by_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]:
return find_frame_by_pattern(FRAME_PATTERN, object, "a StackFrame")
def find_frame_by_regs_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]:
return find_frame_by_pattern(REGS_PATTERN, object, "a RegisterValueContainer")
def find_frame_by_locals_obj(object: TraceObject) -> Optional[Tuple[int, StackFrame]]:
return find_frame_by_pattern(LOCALS_PATTERN, object, "a LocalsContainer")
def find_module_by_base(modbase: TraceObject) -> Module:
for m in commands.prog.modules(): # type: ignore
if modbase == str(hex(m.address_range[0])):
return m
def find_module_by_pattern(pattern: re.Pattern, object: TraceObject,
err_msg: str) -> int:
mat = pattern.fullmatch(object.path)
if mat is None:
raise TypeError(f"{object} is not {err_msg}")
pnum = int(mat['procnum'])
modbase = mat['modbase']
find_proc_by_num(pnum)
return find_module_by_base(modbase)
def find_module_by_obj(object: TraceObject) -> int:
return find_module_by_pattern(MODULE_PATTERN, object, "a Module")
shared_globals: Dict[str, Any] = dict()
@REGISTRY.method()
def execute(cmd: str, to_string: bool = False) -> Optional[str]:
"""Execute a Python3 command or script."""
@ -341,9 +331,12 @@ def activate_thread(thread: Thread) -> None:
@REGISTRY.method(action='activate')
def activate_frame(frame: StackFrame) -> None:
def activate_frame(frame: TraceObject) -> None:
"""Select the frame."""
i, f = find_frame_by_obj(frame)
res = find_frame_by_obj(frame)
if res is None:
return
i, f = res
util.select_frame(i)
with commands.open_tracked_tx('Refresh Stack'):
commands.ghidra_trace_put_frames()
@ -411,7 +404,7 @@ def step_into(thread: Thread,
"""Step one instruction exactly."""
find_thread_by_obj(thread)
time.sleep(1)
hooks.on_stop(None)
hooks.on_stop()
# @REGISTRY.method

View file

@ -14,9 +14,11 @@
# limitations under the License.
##
from collections import namedtuple
from dataclasses import dataclass
import os
import re
import sys
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
import drgn
import drgn.cli
@ -29,7 +31,7 @@ selected_tid = 0
selected_level = 0
def _compute_drgn_ver():
def _compute_drgn_ver() -> DrgnVersion:
blurb = drgn.cli.version_header()
top = blurb.split('\n')[0]
full = top.split()[1] # "drgn x.y.z"
@ -38,7 +40,17 @@ def _compute_drgn_ver():
DRGN_VERSION = _compute_drgn_ver()
def full_mem(self):
@dataclass(frozen=True)
class Region:
start: int
end: int
offset: int
perms: Optional[str]
objfile: str
def full_mem(self) -> Region:
return Region(0, 1 << 64, 0, None, 'full memory')
@ -46,49 +58,41 @@ def get_debugger():
return drgn
def get_target():
return commands.prog
def get_process(name):
return get_target()[name]
def selected_process():
def selected_process() -> int:
return selected_pid
def selected_thread():
def selected_thread() -> int:
return selected_tid
def selected_frame():
def selected_frame() -> int:
return selected_level
def select_process(id: int):
def select_process(id: int) -> int:
global selected_pid
selected_pid = id
return selected_pid
def select_thread(id: int):
def select_thread(id: int) -> int:
global selected_tid
selected_tid = id
return selected_tid
def select_frame(id: int):
def select_frame(id: int) -> int:
global selected_level
selected_level = id
return selected_level
conv_map = {}
conv_map: Dict[str, Any] = {}
def get_convenience_variable(id):
#val = get_target().GetEnvironment().Get(id)
def get_convenience_variable(id: str):
# val = get_target().GetEnvironment().Get(id)
if id not in conv_map:
return "auto"
val = conv_map[id]
@ -97,18 +101,18 @@ def get_convenience_variable(id):
return val
def set_convenience_variable(id, value):
#env = get_target().GetEnvironment()
def set_convenience_variable(id: str, value: Any) -> None:
# env = get_target().GetEnvironment()
# return env.Set(id, value, True)
conv_map[id] = value
def escape_ansi(line):
def escape_ansi(line: str) -> str:
ansi_escape = re.compile(r'(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]')
return ansi_escape.sub('', line)
def debracket(init):
def debracket(init: str) -> str:
val = init
val = val.replace("[", "(")
val = val.replace("]", ")")