aboutsummaryrefslogtreecommitdiff
path: root/Debug/Nuttx.py
diff options
context:
space:
mode:
authormazahner <mazahner@student.ethz.ch>2014-11-10 17:35:48 +0100
committermazahner <mazahner@student.ethz.ch>2014-11-10 17:37:15 +0100
commitd15203de91cc07530809060f949374bb8918daea (patch)
tree4885caf153d1df4c0228e20105245f644a4a3caa /Debug/Nuttx.py
parent8dc165b50efca2d9e86f23296a33f38f16c241e0 (diff)
downloadpx4-firmware-d15203de91cc07530809060f949374bb8918daea.tar.gz
px4-firmware-d15203de91cc07530809060f949374bb8918daea.tar.bz2
px4-firmware-d15203de91cc07530809060f949374bb8918daea.zip
Debug/NuttX: small changes to make it work with the current stable branch. Debug/NuttX made existing code work with python3 in gdb. Added some new features
Diffstat (limited to 'Debug/Nuttx.py')
-rw-r--r--Debug/Nuttx.py433
1 files changed, 386 insertions, 47 deletions
diff --git a/Debug/Nuttx.py b/Debug/Nuttx.py
index 7cc21b99f..cf86dd668 100644
--- a/Debug/Nuttx.py
+++ b/Debug/Nuttx.py
@@ -59,30 +59,42 @@ class NX_register_set(object):
def __init__(self, xcpt_regs):
if xcpt_regs is None:
- self.regs['R0'] = long(gdb.parse_and_eval('$r0'))
- self.regs['R1'] = long(gdb.parse_and_eval('$r1'))
- self.regs['R2'] = long(gdb.parse_and_eval('$r2'))
- self.regs['R3'] = long(gdb.parse_and_eval('$r3'))
- self.regs['R4'] = long(gdb.parse_and_eval('$r4'))
- self.regs['R5'] = long(gdb.parse_and_eval('$r5'))
- self.regs['R6'] = long(gdb.parse_and_eval('$r6'))
- self.regs['R7'] = long(gdb.parse_and_eval('$r7'))
- self.regs['R8'] = long(gdb.parse_and_eval('$r8'))
- self.regs['R9'] = long(gdb.parse_and_eval('$r9'))
- self.regs['R10'] = long(gdb.parse_and_eval('$r10'))
- self.regs['R11'] = long(gdb.parse_and_eval('$r11'))
- self.regs['R12'] = long(gdb.parse_and_eval('$r12'))
- self.regs['R13'] = long(gdb.parse_and_eval('$r13'))
- self.regs['SP'] = long(gdb.parse_and_eval('$sp'))
- self.regs['R14'] = long(gdb.parse_and_eval('$r14'))
- self.regs['LR'] = long(gdb.parse_and_eval('$lr'))
- self.regs['R15'] = long(gdb.parse_and_eval('$r15'))
- self.regs['PC'] = long(gdb.parse_and_eval('$pc'))
- self.regs['XPSR'] = long(gdb.parse_and_eval('$xpsr'))
+ self.regs['R0'] = self.mon_reg_call('r0')
+ self.regs['R1'] = self.mon_reg_call('r1')
+ self.regs['R2'] = self.mon_reg_call('r2')
+ self.regs['R3'] = self.mon_reg_call('r3')
+ self.regs['R4'] = self.mon_reg_call('r4')
+ self.regs['R5'] = self.mon_reg_call('r5')
+ self.regs['R6'] = self.mon_reg_call('r6')
+ self.regs['R7'] = self.mon_reg_call('r7')
+ self.regs['R8'] = self.mon_reg_call('r8')
+ self.regs['R9'] = self.mon_reg_call('r9')
+ self.regs['R10'] = self.mon_reg_call('r10')
+ self.regs['R11'] = self.mon_reg_call('r11')
+ self.regs['R12'] = self.mon_reg_call('r12')
+ self.regs['R13'] = self.mon_reg_call('r13')
+ self.regs['SP'] = self.mon_reg_call('sp')
+ self.regs['R14'] = self.mon_reg_call('r14')
+ self.regs['LR'] = self.mon_reg_call('lr')
+ self.regs['R15'] = self.mon_reg_call('r15')
+ self.regs['PC'] = self.mon_reg_call('pc')
+ self.regs['XPSR'] = self.mon_reg_call('xPSR')
else:
for key in self.v7em_regmap.keys():
self.regs[key] = int(xcpt_regs[self.v7em_regmap[key]])
+ def mon_reg_call(self,register):
+ """
+ register is the register as a string e.g. 'pc'
+ return integer containing the value of the register
+ """
+ str_to_eval = "mon reg "+register
+ resp = gdb.execute(str_to_eval,to_string = True)
+ content = resp.split()[-1];
+ try:
+ return int(content,16)
+ except:
+ return 0
@classmethod
def with_xcpt_regs(cls, xcpt_regs):
@@ -172,7 +184,7 @@ class NX_task(object):
self.__dict__['stack_used'] = 0
else:
stack_limit = self._tcb['adj_stack_size']
- for offset in range(0, stack_limit):
+ for offset in range(0, int(stack_limit)):
if stack_base[offset] != 0xff:
break
self.__dict__['stack_used'] = stack_limit - offset
@@ -187,7 +199,7 @@ class NX_task(object):
def state(self):
"""return the name of the task's current state"""
statenames = gdb.types.make_enum_dict(gdb.lookup_type('enum tstate_e'))
- for name,value in statenames.iteritems():
+ for name,value in statenames.items():
if value == self._tcb['task_state']:
return name
return 'UNKNOWN'
@@ -196,16 +208,19 @@ class NX_task(object):
def waiting_for(self):
"""return a description of what the task is waiting for, if it is waiting"""
if self._state_is('TSTATE_WAIT_SEM'):
- waitsem = self._tcb['waitsem'].dereference()
- waitsem_holder = waitsem['holder']
- holder = NX_task.for_tcb(waitsem_holder['htcb'])
- if holder is not None:
- return '{}({})'.format(waitsem.address, holder.name)
- else:
- return '{}(<bad holder>)'.format(waitsem.address)
+ try:
+ waitsem = self._tcb['waitsem'].dereference()
+ waitsem_holder = waitsem['holder']
+ holder = NX_task.for_tcb(waitsem_holder['htcb'])
+ if holder is not None:
+ return '{}({})'.format(waitsem.address, holder.name)
+ else:
+ return '{}(<bad holder>)'.format(waitsem.address)
+ except:
+ return 'EXCEPTION'
if self._state_is('TSTATE_WAIT_SIG'):
return 'signal'
- return None
+ return ""
@property
def is_waiting(self):
@@ -229,7 +244,7 @@ class NX_task(object):
filearray = filelist['fl_files']
result = dict()
for i in range(filearray.type.range()[0],filearray.type.range()[1]):
- inode = long(filearray[i]['f_inode'])
+ inode = int(filearray[i]['f_inode'])
if inode != 0:
result[i] = inode
return result
@@ -253,7 +268,18 @@ class NX_task(object):
def __str__(self):
return "{}:{}".format(self.pid, self.name)
-
+
+ def showoff(self):
+ print("-------")
+ print(self.pid,end = ", ")
+ print(self.name,end = ", ")
+ print(self.state,end = ", ")
+ print(self.waiting_for,end = ", ")
+ print(self.stack_used,end = ", ")
+ print(self._tcb['adj_stack_size'],end = ", ")
+ print(self.file_descriptors)
+ print(self.registers)
+
def __format__(self, format_spec):
return format_spec.format(
pid = self.pid,
@@ -265,7 +291,7 @@ class NX_task(object):
file_descriptors = self.file_descriptors,
registers = self.registers
)
-
+
class NX_show_task (gdb.Command):
"""(NuttX) prints information about a task"""
@@ -285,7 +311,7 @@ class NX_show_task (gdb.Command):
my_fmt += ' R8 {registers[R8]:#010x} {registers[R9]:#010x} {registers[R10]:#010x} {registers[R11]:#010x}\n'
my_fmt += ' R12 {registers[PC]:#010x}\n'
my_fmt += ' SP {registers[SP]:#010x} LR {registers[LR]:#010x} PC {registers[PC]:#010x} XPSR {registers[XPSR]:#010x}\n'
- print format(t, my_fmt)
+ print(format(t, my_fmt))
class NX_show_tasks (gdb.Command):
"""(NuttX) prints a list of tasks"""
@@ -295,8 +321,10 @@ class NX_show_tasks (gdb.Command):
def invoke(self, args, from_tty):
tasks = NX_task.tasks()
+ print ('Number of tasks: ' + str(len(tasks)))
for t in tasks:
- print format(t, '{pid:<2} {name:<16} {state:<20} {stack_used:>4}/{stack_limit:<4}')
+ #t.showoff()
+ print(format(t, 'Task: {pid} {name} {state} {stack_used}/{stack_limit}'))
NX_show_task()
NX_show_tasks()
@@ -306,15 +334,15 @@ class NX_show_heap (gdb.Command):
def __init__(self):
super(NX_show_heap, self).__init__('show heap', gdb.COMMAND_USER)
- struct_mm_allocnode_s = gdb.lookup_type('struct mm_allocnode_s')
- preceding_size = struct_mm_allocnode_s['preceding'].type.sizeof
- if preceding_size == 2:
+ struct_mm_allocnode_s = gdb.lookup_type('struct mm_allocnode_s')
+ preceding_size = struct_mm_allocnode_s['preceding'].type.sizeof
+ if preceding_size == 2:
self._allocflag = 0x8000
- elif preceding_size == 4:
+ elif preceding_size == 4:
self._allocflag = 0x80000000
- else:
- raise gdb.GdbError('invalid mm_allocnode_s.preceding size %u' % preceding_size)
- self._allocnodesize = struct_mm_allocnode_s.sizeof
+ else:
+ raise gdb.GdbError('invalid mm_allocnode_s.preceding size %u' % preceding_size)
+ self._allocnodesize = struct_mm_allocnode_s.sizeof
def _node_allocated(self, allocnode):
if allocnode['preceding'] & self._allocflag:
@@ -328,7 +356,7 @@ class NX_show_heap (gdb.Command):
if region_start >= region_end:
raise gdb.GdbError('heap region {} corrupt'.format(hex(region_start)))
nodecount = region_end - region_start
- print 'heap {} - {}'.format(region_start, region_end)
+ print ('heap {} - {}'.format(region_start, region_end))
cursor = 1
while cursor < nodecount:
allocnode = region_start[cursor]
@@ -336,8 +364,8 @@ class NX_show_heap (gdb.Command):
state = ''
else:
state = '(free)'
- print ' {} {} {}'.format(allocnode.address + self._allocnodesize,
- self._node_size(allocnode), state)
+ print( ' {} {} {}'.format(allocnode.address + self._allocnodesize,
+ self._node_size(allocnode), state))
cursor += self._node_size(allocnode) / self._allocnodesize
def invoke(self, args, from_tty):
@@ -345,7 +373,7 @@ class NX_show_heap (gdb.Command):
nregions = heap['mm_nregions']
region_starts = heap['mm_heapstart']
region_ends = heap['mm_heapend']
- print '{} heap(s)'.format(nregions)
+ print( '{} heap(s)'.format(nregions))
# walk the heaps
for i in range(0, nregions):
self._print_allocations(region_starts[i], region_ends[i])
@@ -370,6 +398,317 @@ class NX_show_interrupted_thread (gdb.Command):
my_fmt += ' R8 {registers[R8]:#010x} {registers[R9]:#010x} {registers[R10]:#010x} {registers[R11]:#010x}\n'
my_fmt += ' R12 {registers[PC]:#010x}\n'
my_fmt += ' SP {registers[SP]:#010x} LR {registers[LR]:#010x} PC {registers[PC]:#010x} XPSR {registers[XPSR]:#010x}\n'
- print format(registers, my_fmt)
+ print (format(registers, my_fmt))
NX_show_interrupted_thread()
+
+class NX_check_tcb(gdb.Command):
+ """ check the tcb of a task from a address """
+ def __init__(self):
+ super(NX_check_tcb,self).__init__('show tcb', gdb.COMMAND_USER)
+
+ def invoke(self,args,sth):
+ tasks = NX_task.tasks()
+ print("tcb int: ",int(args))
+ print(tasks[int(args)]._tcb)
+ a =tasks[int(args)]._tcb['xcp']['regs']
+ print("relevant registers:")
+ for reg in regmap:
+ hex_addr= hex(int(a[regmap[reg]]))
+ eval_string = 'info line *'+str(hex_addr)
+ print(reg,": ",hex_addr,)
+NX_check_tcb()
+
+class NX_tcb(object):
+ def __init__(self):
+ pass
+
+ def is_in(self,arg,list):
+ for i in list:
+ if arg == i:
+ return True;
+ return False
+
+ def find_tcb_list(self,dq_entry_t):
+ tcb_list = []
+ tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
+ first_tcb = tcb_ptr.dereference()
+ tcb_list.append(first_tcb);
+ next_tcb = first_tcb['flink'].dereference()
+ while not self.is_in(int(next_tcb['pid']),[int(t['pid']) for t in tcb_list]):
+ tcb_list.append(next_tcb);
+ old_tcb = next_tcb;
+ next_tcb = old_tcb['flink'].dereference()
+
+ return [t for t in tcb_list if int(t['pid'])<2000]
+
+ def getTCB(self):
+ list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
+ tcb_list = [];
+ for l in list_of_listsnames:
+ li = gdb.lookup_global_symbol(l)
+ print(li)
+ cursor = li.value()['head']
+ tcb_list = tcb_list + self.find_tcb_list(cursor)
+
+class NX_check_stack_order(gdb.Command):
+ """ Check the Stack order corresponding to the tasks """
+
+ def __init__(self):
+ super(NX_check_stack_order,self).__init__('show check_stack', gdb.COMMAND_USER)
+
+ def is_in(self,arg,list):
+ for i in list:
+ if arg == i:
+ return True;
+ return False
+
+ def find_tcb_list(self,dq_entry_t):
+ tcb_list = []
+ tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
+ first_tcb = tcb_ptr.dereference()
+ tcb_list.append(first_tcb);
+ next_tcb = first_tcb['flink'].dereference()
+ while not self.is_in(int(next_tcb['pid']),[int(t['pid']) for t in tcb_list]):
+ tcb_list.append(next_tcb);
+ old_tcb = next_tcb;
+ next_tcb = old_tcb['flink'].dereference()
+
+ return [t for t in tcb_list if int(t['pid'])<2000]
+
+ def getTCB(self):
+ list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
+ tcb_list = [];
+ for l in list_of_listsnames:
+ li = gdb.lookup_global_symbol(l)
+ cursor = li.value()['head']
+ tcb_list = tcb_list + self.find_tcb_list(cursor)
+ return tcb_list
+
+ def getSPfromTask(self,tcb):
+ regmap = NX_register_set.v7em_regmap
+ a =tcb['xcp']['regs']
+ return int(a[regmap['SP']])
+
+ def find_closest(self,list,val):
+ tmp_list = [abs(i-val) for i in list]
+ tmp_min = min(tmp_list)
+ idx = tmp_list.index(tmp_min)
+ return idx,list[idx]
+
+ def find_next_stack(self,address,_dict_in):
+ add_list = []
+ name_list = []
+ for key in _dict_in.keys():
+ for i in range(3):
+ if _dict_in[key][i] < address:
+ add_list.append(_dict_in[key][i])
+ if i == 2: # the last one is the processes stack pointer
+ name_list.append(self.check_name(key)+"_SP")
+ else:
+ name_list.append(self.check_name(key))
+
+ idx,new_address = self.find_closest(add_list,address)
+ return new_address,name_list[idx]
+
+ def check_name(self,name):
+ if isinstance(name,(list)):
+ name = name[0];
+ idx = name.find("\\")
+ newname = name[:idx]
+
+ return newname
+
+ def invoke(self,args,sth):
+ tcb = self.getTCB();
+ stackadresses={};
+ for t in tcb:
+ p = [];
+ #print(t.name,t._tcb['stack_alloc_ptr'])
+ p.append(int(t['stack_alloc_ptr']))
+ p.append(int(t['adj_stack_ptr']))
+ p.append(self.getSPfromTask(t))
+ stackadresses[str(t['name'])] = p;
+ address = int("0x30000000",0)
+ print("stack address : process")
+ for i in range(len(stackadresses)*3):
+ address,name = self.find_next_stack(address,stackadresses)
+ print(hex(address),": ",name)
+
+NX_check_stack_order()
+
+class NX_run_debug_util(gdb.Command):
+ """ show the registers of a task corresponding to a tcb address"""
+ def __init__(self):
+ super(NX_run_debug_util,self).__init__('show regs', gdb.COMMAND_USER)
+
+ def printRegisters(self,task):
+ regmap = NX_register_set.v7em_regmap
+ a =task._tcb['xcp']['regs']
+ print("relevant registers in ",task.name,":")
+ for reg in regmap:
+ hex_addr= hex(int(a[regmap[reg]]))
+ eval_string = 'info line *'+str(hex_addr)
+ print(reg,": ",hex_addr,)
+
+ def getPCfromTask(self,task):
+ regmap = NX_register_set.v7em_regmap
+ a =task._tcb['xcp']['regs']
+ return hex(int(a[regmap['PC']]))
+
+ def invoke(self,args,sth):
+ tasks = NX_task.tasks()
+ if args == '':
+ for t in tasks:
+ self.printRegisters(t)
+ eval_str = "list *"+str(self.getPCfromTask(t))
+ print("this is the location in code where the current threads $pc is:")
+ gdb.execute(eval_str)
+ else:
+ tcb_nr = int(args);
+ print("tcb_nr = ",tcb_nr)
+ t = tasks[tcb_nr]
+ self.printRegisters(t)
+ eval_str = "list *"+str(self.getPCfromTask(t))
+ print("this is the location in code where the current threads $pc is:")
+ gdb.execute(eval_str)
+
+NX_run_debug_util()
+
+
+class NX_search_tcb(gdb.Command):
+ """ shot PID's of all running tasks """
+
+ def __init__(self):
+ super(NX_search_tcb,self).__init__('show alltcb', gdb.COMMAND_USER)
+
+ def is_in(self,arg,list):
+ for i in list:
+ if arg == i:
+ return True;
+ return False
+
+ def find_tcb_list(self,dq_entry_t):
+ tcb_list = []
+ tcb_ptr = dq_entry_t.cast(gdb.lookup_type('struct tcb_s').pointer())
+ first_tcb = tcb_ptr.dereference()
+ tcb_list.append(first_tcb);
+ next_tcb = first_tcb['flink'].dereference()
+ while not self.is_in(int(next_tcb['pid']),[int(t['pid']) for t in tcb_list]):
+ tcb_list.append(next_tcb);
+ old_tcb = next_tcb;
+ next_tcb = old_tcb['flink'].dereference()
+
+ return [t for t in tcb_list if int(t['pid'])<2000]
+
+ def invoke(self,args,sth):
+ list_of_listsnames = ['g_pendingtasks','g_readytorun','g_waitingforsemaphore','g_waitingforsignal','g_inactivetasks']
+ tasks = [];
+ for l in list_of_listsnames:
+ li = gdb.lookup_global_symbol(l)
+ cursor = li.value()['head']
+ tasks = tasks + self.find_tcb_list(cursor)
+
+ # filter for tasks that are listed twice
+ tasks_filt = {}
+ for t in tasks:
+ pid = int(t['pid']);
+ if not pid in tasks_filt.keys():
+ tasks_filt[pid] = t['name'];
+ print('{num_t} Tasks found:'.format(num_t = len(tasks_filt)))
+ for pid in tasks_filt.keys():
+ print("PID: ",pid," ",tasks_filt[pid])
+
+NX_search_tcb()
+
+
+class NX_my_bt(gdb.Command):
+ """ 'fake' backtrace: backtrace the stack of a process and check every suspicious address for the list
+ arg: tcb_address$
+ (can easily be found by typing 'showtask').
+ """
+
+ def __init__(self):
+ super(NX_my_bt,self).__init__('show mybt', gdb.COMMAND_USER)
+
+ def readmem(self,addr):
+ '''
+ read memory at addr and return nr
+ '''
+ str_to_eval = "x/x "+hex(addr)
+ resp = gdb.execute(str_to_eval,to_string = True)
+ idx = resp.find('\t')
+ return int(resp[idx:],16)
+
+ def is_in_bounds(self,val):
+ lower_bound = int("08004000",16)
+ upper_bound = int("080ae0c0",16);
+ #print(lower_bound," ",val," ",upper_bound)
+ if val>lower_bound and val<upper_bound:
+ return True;
+ else:
+ return False;
+ def get_tcb_from_address(self,addr):
+ addr_value = gdb.Value(addr)
+ tcb_ptr = addr_value.cast(gdb.lookup_type('struct tcb_s').pointer())
+ return tcb_ptr.dereference()
+
+ def print_instruction_at(self,addr,stack_percentage):
+ gdb.write(str(round(stack_percentage,2))+":")
+ str_to_eval = "info line *"+hex(addr)
+ #gdb.execute(str_to_eval)
+ res = gdb.execute(str_to_eval,to_string = True)
+ # get information from results string:
+ words = res.split()
+ valid = False
+ if words[0] == 'No':
+ #no line info...
+ pass
+ else:
+ valid = True
+ line = int(words[1])
+ idx = words[3].rfind("/"); #find first backslash
+ if idx>0:
+ name = words[3][idx+1:];
+ path = words[3][:idx];
+ else:
+ name = words[3];
+ path = "";
+ block = gdb.block_for_pc(addr)
+ func = block.function
+ if str(func) == "None":
+ func = block.superblock.function
+
+ if valid:
+ print("Line: ",line," in ",path,"/",name,"in ",func)
+ return name,path,line,func
+
+
+
+
+ def invoke(self,args,sth):
+ addr_dec = int(args[2:],16)
+ _tcb = self.get_tcb_from_address(addr_dec)
+ print("found task with PID: ",_tcb["pid"])
+ up_stack = int(_tcb['adj_stack_ptr'])
+ curr_sp = int(_tcb['xcp']['regs'][0]) #curr stack pointer
+ other_sp = int(_tcb['xcp']['regs'][8]) # other stack pointer
+ stacksize = int(_tcb['adj_stack_size']) # other stack pointer
+
+ print("tasks current SP = ",hex(curr_sp),"stack max ptr is at ",hex(up_stack))
+
+ if curr_sp == up_stack:
+ sp = other_sp
+ else:
+ sp = curr_sp;
+
+ while(sp < up_stack):
+ mem = self.readmem(sp)
+ #print(hex(sp)," : ",hex(mem))
+ if self.is_in_bounds(mem):
+ # this is a potential instruction ptr
+ stack_percentage = (up_stack-sp)/stacksize
+ name,path,line,func = self.print_instruction_at(mem,stack_percentage)
+ sp = sp + 4; # jump up one word
+
+NX_my_bt()