Package gluon :: Module debug
[hide private]
[frames] | no frames]

Source Code for Module gluon.debug

  1  #!/usr/bin/env python 
  2  # -*- coding: utf-8 -*- 
  3   
  4  """ 
  5  This file is part of the web2py Web Framework 
  6  Developed by Massimo Di Pierro <mdipierro@cs.depaul.edu>, 
  7  limodou <limodou@gmail.com> and srackham <srackham@gmail.com>. 
  8  License: LGPLv3 (http://www.gnu.org/licenses/lgpl.html) 
  9   
 10  """ 
 11   
 12  import logging 
 13  import os 
 14  import pdb 
 15  import Queue 
 16  import sys 
 17   
 18  logger = logging.getLogger("web2py") 
19 20 21 -class Pipe(Queue.Queue):
22 - def __init__(self, name, mode='r', *args, **kwargs):
23 self.__name = name 24 Queue.Queue.__init__(self, *args, **kwargs)
25
26 - def write(self, data):
27 logger.debug("debug %s writting %s" % (self.__name, data)) 28 self.put(data)
29
30 - def flush(self):
31 # mark checkpoint (complete message) 32 logger.debug("debug %s flushing..." % self.__name) 33 self.put(None) 34 # wait until it is processed 35 self.join() 36 logger.debug("debug %s flush done" % self.__name)
37
38 - def read(self, count=None, timeout=None):
39 logger.debug("debug %s reading..." % (self.__name, )) 40 data = self.get(block=True, timeout=timeout) 41 # signal that we are ready 42 self.task_done() 43 logger.debug("debug %s read %s" % (self.__name, data)) 44 return data
45
46 - def readline(self):
47 logger.debug("debug %s readline..." % (self.__name, )) 48 return self.read()
49 50 51 pipe_in = Pipe('in') 52 pipe_out = Pipe('out') 53 54 debugger = pdb.Pdb(completekey=None, stdin=pipe_in, stdout=pipe_out,)
55 56 57 -def set_trace():
58 "breakpoint shortcut (like pdb)" 59 logger.info("DEBUG: set_trace!") 60 debugger.set_trace(sys._getframe().f_back)
61
62 63 -def stop_trace():
64 "stop waiting for the debugger (called atexit)" 65 # this should prevent communicate is wait forever a command result 66 # and the main thread has finished 67 logger.info("DEBUG: stop_trace!") 68 pipe_out.write("debug finished!") 69 pipe_out.write(None)
70 #pipe_out.flush()
71 72 73 -def communicate(command=None):
74 "send command to debbuger, wait result" 75 if command is not None: 76 logger.info("DEBUG: sending command %s" % command) 77 pipe_in.write(command) 78 #pipe_in.flush() 79 result = [] 80 while True: 81 data = pipe_out.read() 82 if data is None: 83 break 84 result.append(data) 85 logger.info("DEBUG: result %s" % repr(result)) 86 return ''.join(result)
87 88 89 # New debugger implementation using qdb and a web UI 90 91 import gluon.contrib.qdb as qdb 92 from threading import RLock 93 94 interact_lock = RLock() 95 run_lock = RLock()
96 97 98 -def check_interaction(fn):
99 "Decorator to clean and prevent interaction when not available" 100 def check_fn(self, *args, **kwargs): 101 interact_lock.acquire() 102 try: 103 if self.filename: 104 self.clear_interaction() 105 return fn(self, *args, **kwargs) 106 finally: 107 interact_lock.release()
108 return check_fn 109
110 111 -class WebDebugger(qdb.Frontend):
112 "Qdb web2py interface" 113
114 - def __init__(self, pipe, completekey='tab', stdin=None, stdout=None):
115 qdb.Frontend.__init__(self, pipe) 116 self.clear_interaction()
117
118 - def clear_interaction(self):
119 self.filename = None 120 self.lineno = None 121 self.exception_info = None 122 self.context = None
123 124 # redefine Frontend methods: 125
126 - def run(self):
127 run_lock.acquire() 128 try: 129 while self.pipe.poll(): 130 qdb.Frontend.run(self) 131 finally: 132 run_lock.release()
133
134 - def interaction(self, filename, lineno, line, **context):
135 # store current status 136 interact_lock.acquire() 137 try: 138 self.filename = filename 139 self.lineno = lineno 140 self.context = context 141 finally: 142 interact_lock.release()
143
144 - def exception(self, title, extype, exvalue, trace, request):
145 self.exception_info = {'title': title, 146 'extype': extype, 'exvalue': exvalue, 147 'trace': trace, 'request': request}
148 149 @check_interaction
150 - def do_continue(self):
151 qdb.Frontend.do_continue(self)
152 153 @check_interaction
154 - def do_step(self):
155 qdb.Frontend.do_step(self)
156 157 @check_interaction
158 - def do_return(self):
159 qdb.Frontend.do_return(self)
160 161 @check_interaction
162 - def do_next(self):
163 qdb.Frontend.do_next(self)
164 165 @check_interaction
166 - def do_quit(self):
167 qdb.Frontend.do_quit(self)
168
169 - def do_exec(self, statement):
170 interact_lock.acquire() 171 try: 172 # check to see if we're inside interaction 173 if self.filename: 174 # avoid spurious interaction notifications: 175 self.set_burst(2) 176 # execute the statement in the remote debugger: 177 return qdb.Frontend.do_exec(self, statement) 178 finally: 179 interact_lock.release()
180 181 # create the connection between threads: 182 183 parent_queue, child_queue = Queue.Queue(), Queue.Queue() 184 front_conn = qdb.QueuePipe("parent", parent_queue, child_queue) 185 child_conn = qdb.QueuePipe("child", child_queue, parent_queue) 186 187 web_debugger = WebDebugger(front_conn) # frontend 188 qdb_debugger = qdb.Qdb( 189 pipe=child_conn, redirect_stdio=False, skip=None) # backend 190 dbg = qdb_debugger 191 192 # enable getting context (stack, globals/locals) at interaction 193 qdb_debugger.set_params(dict(call_stack=True, environment=True)) 194 195 import gluon.main 196 gluon.main.global_settings.debugging = True 197