1   
   2   
   3   
   4   
   5   
   6   
   7   
   8  import sys 
   9  import errno 
  10  import socket 
  11  import logging 
  12  import platform 
  13   
  14   
  15  VERSION = '1.2.6' 
  16  SERVER_NAME = socket.gethostname() 
  17  SERVER_SOFTWARE = 'Rocket %s' % VERSION 
  18  HTTP_SERVER_SOFTWARE = '%s Python/%s' % ( 
  19      SERVER_SOFTWARE, sys.version.split(' ')[0]) 
  20  BUF_SIZE = 16384 
  21  SOCKET_TIMEOUT = 10   
  22  THREAD_STOP_CHECK_INTERVAL = 1   
  23  IS_JYTHON = platform.system() == 'Java'   
  24  IGNORE_ERRORS_ON_CLOSE = set([errno.ECONNABORTED, errno.ECONNRESET]) 
  25  DEFAULT_LISTEN_QUEUE_SIZE = 5 
  26  DEFAULT_MIN_THREADS = 10 
  27  DEFAULT_MAX_THREADS = 0 
  28  DEFAULTS = dict(LISTEN_QUEUE_SIZE=DEFAULT_LISTEN_QUEUE_SIZE, 
  29                  MIN_THREADS=DEFAULT_MIN_THREADS, 
  30                  MAX_THREADS=DEFAULT_MAX_THREADS) 
  31   
  32  PY3K = sys.version_info[0] > 2 
  33   
  34   
  36      "A Logging handler to prevent library errors." 
  37 -    def emit(self, record): 
    39   
  40  if PY3K: 
  42          """ Convert string/unicode/bytes literals into bytes.  This allows for 
  43          the same code to run on Python 2.x and 3.x. """ 
  44          if isinstance(val, str): 
  45              return val.encode() 
  46          else: 
  47              return val 
   48   
  49 -    def u(val, encoding="us-ascii"): 
   50          """ Convert bytes into string/unicode.  This allows for the 
  51          same code to run on Python 2.x and 3.x. """ 
  52          if isinstance(val, bytes): 
  53              return val.decode(encoding) 
  54          else: 
  55              return val 
   56   
  57  else: 
  59          """ Convert string/unicode/bytes literals into bytes.  This allows for 
  60          the same code to run on Python 2.x and 3.x. """ 
  61          if isinstance(val, unicode): 
  62              return val.encode() 
  63          else: 
  64              return val 
   65   
  66 -    def u(val, encoding="us-ascii"): 
   67          """ Convert bytes into string/unicode.  This allows for the 
  68          same code to run on Python 2.x and 3.x. """ 
  69          if isinstance(val, str): 
  70              return val.decode(encoding) 
  71          else: 
  72              return val 
   73   
  74   
  75   
  76   
  77  __all__ = ['VERSION', 'SERVER_SOFTWARE', 'HTTP_SERVER_SOFTWARE', 'BUF_SIZE', 
  78             'IS_JYTHON', 'IGNORE_ERRORS_ON_CLOSE', 'DEFAULTS', 'PY3K', 'b', 'u', 
  79             'Rocket', 'CherryPyWSGIServer', 'SERVER_NAME', 'NullHandler'] 
  80   
  81   
  82   
  83   
  84   
  85  import sys 
  86  import time 
  87  import socket 
  88  try: 
  89      import ssl 
  90      has_ssl = True 
  91  except ImportError: 
  92      has_ssl = False 
  93   
  94   
  95   
  96   
  97   
  98   
 100      __slots__ = [ 
 101          'setblocking', 
 102          'sendall', 
 103          'shutdown', 
 104          'makefile', 
 105          'fileno', 
 106          'client_addr', 
 107          'client_port', 
 108          'server_port', 
 109          'socket', 
 110          'start_time', 
 111          'ssl', 
 112          'secure', 
 113          'recv', 
 114          'send', 
 115          'read', 
 116          'write' 
 117      ] 
 118   
 119 -    def __init__(self, sock_tuple, port, secure=False): 
  146   
 148          pending = len(buf) 
 149          offset = 0 
 150          while pending: 
 151              try: 
 152                  sent = self.socket.send(buf[offset:]) 
 153                  pending -= sent 
 154                  offset += sent 
 155              except socket.error: 
 156                  import errno 
 157                  info = sys.exc_info() 
 158                  if info[1].args[0] != errno.EAGAIN: 
 159                      raise 
 160          return offset 
  161   
 162   
 163   
 164   
 165   
 167          if hasattr(self.socket, '_sock'): 
 168              try: 
 169                  self.socket._sock.close() 
 170              except socket.error: 
 171                  info = sys.exc_info() 
 172                  if info[1].args[0] != socket.EBADF: 
 173                      raise info[1] 
 174                  else: 
 175                      pass 
 176          self.socket.close() 
   177   
 178   
 179   
 180   
 181   
 182  import socket 
 183  try: 
 184      from io import StringIO 
 185  except ImportError: 
 186      try: 
 187          from cStringIO import StringIO 
 188      except ImportError: 
 189          from StringIO import StringIO 
 190   
 191   
 192   
 193   
 196          self.conn = conn 
 197          self.buf_size = buf_size 
 198          self.buffer = StringIO() 
 199          self.content_length = None 
 200   
 201          if self.conn.socket.gettimeout() == 0.0: 
 202              self.read = self.non_blocking_read 
 203          else: 
 204              self.read = self.blocking_read 
  205   
 208   
 209 -    def recv(self, size): 
  210          while True: 
 211              try: 
 212                  return self.conn.recv(size) 
 213              except socket.error: 
 214                  exc = sys.exc_info() 
 215                  e = exc[1] 
 216                   
 217                  if (e.args[0] not in set()): 
 218                      raise 
  219   
 221          data = self.readline() 
 222          if data == '': 
 223              raise StopIteration 
 224          return data 
  225   
 227           
 228          bufr = self.buffer 
 229          bufr.seek(0, 2) 
 230          if size is None: 
 231              while True: 
 232                  data = self.recv(self.buf_size) 
 233                  if not data: 
 234                      break 
 235                  bufr.write(data) 
 236   
 237              self.buffer = StringIO() 
 238   
 239              return bufr.getvalue() 
 240          else: 
 241              buf_len = self.buffer.tell() 
 242              if buf_len >= size: 
 243                  bufr.seek(0) 
 244                  data = bufr.read(size) 
 245                  self.buffer = StringIO(bufr.read()) 
 246                  return data 
 247   
 248              self.buffer = StringIO() 
 249              while True: 
 250                  remaining = size - buf_len 
 251                  data = self.recv(remaining) 
 252   
 253                  if not data: 
 254                      break 
 255   
 256                  n = len(data) 
 257                  if n == size and not buf_len: 
 258                      return data 
 259   
 260                  if n == remaining: 
 261                      bufr.write(data) 
 262                      del data 
 263                      break 
 264   
 265                  bufr.write(data) 
 266                  buf_len += n 
 267                  del data 
 268   
 269              return bufr.getvalue() 
  270   
 272          if length is None: 
 273              if self.content_length is not None: 
 274                  length = self.content_length 
 275              else: 
 276                  length = 1 
 277   
 278          try: 
 279              data = self.conn.recv(length) 
 280          except: 
 281              data = b('') 
 282   
 283          return data 
  284   
 286          data = b("") 
 287          char = self.read(1) 
 288          while char != b('\n') and char is not b(''): 
 289              line = repr(char) 
 290              data += char 
 291              char = self.read(1) 
 292          data += char 
 293          return data 
  294   
 297   
 299          self.conn = None 
 300          self.content_length = None 
  325 -    def __init__(self, f_dict, *args, **kwargs): 
  326          Future.__init__(self, *args, **kwargs) 
 327   
 328          self.timeout = None 
 329   
 330          self._mem_dict = f_dict 
 331          self._lifespan = 30 
 332          self._name = None 
 333          self._start_time = time.time() 
  334   
 340   
 341 -    def remember(self, name, lifespan=None): 
  342          self._lifespan = lifespan or self._lifespan 
 343   
 344          if name in self._mem_dict: 
 345              raise NameError('Cannot remember future by name "%s".  ' % name + 
 346                              'A future already exists with that name.') 
 347          self._name = name 
 348          self._mem_dict[name] = self 
 349   
 350          return self 
  351   
 353          if self._name in self._mem_dict and self._mem_dict[self._name] is self: 
 354              del self._mem_dict[self._name] 
 355              self._name = None 
  359 -    def __init__(self, future, fn, args, kwargs): 
  360          self.future = future 
 361          self.fn = fn 
 362          self.args = args 
 363          self.kwargs = kwargs 
  364   
 366          if not self.future.set_running_or_notify_cancel(): 
 367              return 
 368   
 369          try: 
 370              result = self.fn(*self.args, **self.kwargs) 
 371          except BaseException: 
 372              e = sys.exc_info()[1] 
 373              self.future.set_exception(e) 
 374          else: 
 375              self.future.set_result(result) 
  379      multithread = True 
 380      multiprocess = False 
 381   
 383          ThreadPoolExecutor.__init__(self, *args, **kwargs) 
 384   
 385          self.futures = dict() 
  386   
 387 -    def submit(self, fn, *args, **kwargs): 
  388          if self._shutdown_lock.acquire(): 
 389              if self._shutdown: 
 390                  self._shutdown_lock.release() 
 391                  raise RuntimeError( 
 392                      'Cannot schedule new futures after shutdown') 
 393   
 394              f = WSGIFuture(self.futures) 
 395              w = _WorkItem(f, fn, args, kwargs) 
 396   
 397              self._work_queue.put(w) 
 398              self._adjust_thread_count() 
 399              self._shutdown_lock.release() 
 400              return f 
 401          else: 
 402              return False 
   403   
 404   
 406      "Futures middleware that adds a Futures Executor to the environment" 
 410   
 411 -    def __call__(self, environ, start_response): 
   415   
 416   
 417   
 418   
 419   
 420  import os 
 421  import socket 
 422  import logging 
 423  import traceback 
 424  from threading import Thread 
 425   
 426  try: 
 427      import ssl 
 428      from ssl import SSLError 
 429      has_ssl = True 
 430  except ImportError: 
 431      has_ssl = False 
 432   
 435   
 436   
 437   
 438   
 440      """The Listener class is a class responsible for accepting connections 
 441      and queuing them to be processed by a worker thread.""" 
 442   
 443 -    def __init__(self, interface, queue_size, active_queue, *args, **kwargs): 
  444          Thread.__init__(self, *args, **kwargs) 
 445   
 446           
 447          self.active_queue = active_queue 
 448          self.interface = interface 
 449          self.addr = interface[0] 
 450          self.port = interface[1] 
 451          self.secure = len(interface) >= 4 
 452          self.clientcert_req = (len(interface) == 5 and interface[4]) 
 453   
 454          self.thread = None 
 455          self.ready = False 
 456   
 457           
 458          self.err_log = logging.getLogger('Rocket.Errors.Port%i' % self.port) 
 459          self.err_log.addHandler(NullHandler()) 
 460   
 461           
 462          if ':' in self.addr: 
 463              listener = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 
 464          else: 
 465              listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
 466   
 467          if not listener: 
 468              self.err_log.error("Failed to get socket.") 
 469              return 
 470   
 471          if self.secure: 
 472              if not has_ssl: 
 473                  self.err_log.error("ssl module required to serve HTTPS.") 
 474                  return 
 475              elif not os.path.exists(interface[2]): 
 476                  data = (interface[2], interface[0], interface[1]) 
 477                  self.err_log.error("Cannot find key file " 
 478                                     "'%s'.  Cannot bind to %s:%s" % data) 
 479                  return 
 480              elif not os.path.exists(interface[3]): 
 481                  data = (interface[3], interface[0], interface[1]) 
 482                  self.err_log.error("Cannot find certificate file " 
 483                                     "'%s'.  Cannot bind to %s:%s" % data) 
 484                  return 
 485   
 486              if self.clientcert_req and not os.path.exists(interface[4]): 
 487                  data = (interface[4], interface[0], interface[1]) 
 488                  self.err_log.error("Cannot find root ca certificate file " 
 489                                     "'%s'.  Cannot bind to %s:%s" % data) 
 490                  return 
 491   
 492           
 493          try: 
 494              listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
 495          except: 
 496              msg = "Cannot share socket.  Using %s:%i exclusively." 
 497              self.err_log.warning(msg % (self.addr, self.port)) 
 498   
 499          try: 
 500              if not IS_JYTHON: 
 501                  listener.setsockopt(socket.IPPROTO_TCP, 
 502                                      socket.TCP_NODELAY, 
 503                                      1) 
 504          except: 
 505              msg = "Cannot set TCP_NODELAY, things might run a little slower" 
 506              self.err_log.warning(msg) 
 507   
 508          try: 
 509              listener.bind((self.addr, self.port)) 
 510          except: 
 511              msg = "Socket %s:%i in use by other process and it won't share." 
 512              self.err_log.error(msg % (self.addr, self.port)) 
 513          else: 
 514               
 515               
 516              listener.settimeout(THREAD_STOP_CHECK_INTERVAL) 
 517               
 518               
 519              listener.listen(queue_size) 
 520   
 521              self.listener = listener 
 522   
 523              self.ready = True 
  524   
 526          try: 
 527              if self.clientcert_req: 
 528                  ca_certs = self.interface[4] 
 529                  cert_reqs = ssl.CERT_OPTIONAL 
 530                  sock = ssl.wrap_socket(sock, 
 531                                         keyfile=self.interface[2], 
 532                                         certfile=self.interface[3], 
 533                                         server_side=True, 
 534                                         cert_reqs=cert_reqs, 
 535                                         ca_certs=ca_certs, 
 536                                         ssl_version=ssl.PROTOCOL_SSLv23) 
 537              else: 
 538                  sock = ssl.wrap_socket(sock, 
 539                                         keyfile=self.interface[2], 
 540                                         certfile=self.interface[3], 
 541                                         server_side=True, 
 542                                         ssl_version=ssl.PROTOCOL_SSLv23) 
 543          except SSLError: 
 544               
 545               
 546               
 547              pass 
 548   
 549          return sock 
  550   
 552          if not self.ready: 
 553              self.err_log.warning('Listener started when not ready.') 
 554              return 
 555   
 556          if self.thread is not None and self.thread.isAlive(): 
 557              self.err_log.warning('Listener already running.') 
 558              return 
 559   
 560          self.thread = Thread(target=self.listen, name="Port" + str(self.port)) 
 561   
 562          self.thread.start() 
  563   
 565          if self.thread is None: 
 566              return False 
 567   
 568          return self.thread.isAlive() 
  569   
 571          if self.thread is None: 
 572              return 
 573   
 574          self.ready = False 
 575   
 576          self.thread.join() 
 577   
 578          del self.thread 
 579          self.thread = None 
 580          self.ready = True 
  581   
 583          if __debug__: 
 584              self.err_log.debug('Entering main loop.') 
 585          while True: 
 586              try: 
 587                  sock, addr = self.listener.accept() 
 588   
 589                  if self.secure: 
 590                      sock = self.wrap_socket(sock) 
 591   
 592                  self.active_queue.put(((sock, addr), 
 593                                         self.interface[1], 
 594                                         self.secure)) 
 595   
 596              except socket.timeout: 
 597                   
 598                   
 599                   
 600   
 601                  if not self.ready: 
 602                      if __debug__: 
 603                          self.err_log.debug('Listener exiting.') 
 604                      return 
 605                  else: 
 606                      continue 
 607              except: 
 608                  self.err_log.error(traceback.format_exc()) 
   609   
 610   
 611   
 612   
 613   
 614  import sys 
 615  import time 
 616  import socket 
 617  import logging 
 618  import traceback 
 619  from threading import Lock 
 620  try: 
 621      from queue import Queue 
 622  except ImportError: 
 623      from Queue import Queue 
 624   
 625   
 626   
 627   
 628   
 629  log = logging.getLogger('Rocket') 
 630  log.addHandler(NullHandler()) 
 631   
 632   
 634      """The Rocket class is responsible for handling threads and accepting and 
 635      dispatching connections.""" 
 636   
 637 -    def __init__(self, 
 638                   interfaces=('127.0.0.1', 8000), 
 639                   method='wsgi', 
 640                   app_info=None, 
 641                   min_threads=None, 
 642                   max_threads=None, 
 643                   queue_size=None, 
 644                   timeout=600, 
 645                   handle_signals=True): 
  646   
 647          self.handle_signals = handle_signals 
 648          self.startstop_lock = Lock() 
 649          self.timeout = timeout 
 650   
 651          if not isinstance(interfaces, list): 
 652              self.interfaces = [interfaces] 
 653          else: 
 654              self.interfaces = interfaces 
 655   
 656          if min_threads is None: 
 657              min_threads = DEFAULTS['MIN_THREADS'] 
 658   
 659          if max_threads is None: 
 660              max_threads = DEFAULTS['MAX_THREADS'] 
 661   
 662          if not queue_size: 
 663              if hasattr(socket, 'SOMAXCONN'): 
 664                  queue_size = socket.SOMAXCONN 
 665              else: 
 666                  queue_size = DEFAULTS['LISTEN_QUEUE_SIZE'] 
 667   
 668          if max_threads and queue_size > max_threads: 
 669              queue_size = max_threads 
 670   
 671          if isinstance(app_info, dict): 
 672              app_info['server_software'] = SERVER_SOFTWARE 
 673   
 674          self.monitor_queue = Queue() 
 675          self.active_queue = Queue() 
 676   
 677          self._threadpool = ThreadPool(get_method(method), 
 678                                        app_info=app_info, 
 679                                        active_queue=self.active_queue, 
 680                                        monitor_queue=self.monitor_queue, 
 681                                        min_threads=min_threads, 
 682                                        max_threads=max_threads) 
 683   
 684           
 685          self.listeners = [Listener( 
 686              i, queue_size, self.active_queue) for i in self.interfaces] 
 687          for ndx in range(len(self.listeners) - 1, 0, -1): 
 688              if not self.listeners[ndx].ready: 
 689                  del self.listeners[ndx] 
 690   
 691          if not self.listeners: 
 692              log.critical("No interfaces to listen on...closing.") 
 693              sys.exit(1) 
  694   
 696          log.info('Received SIGTERM') 
 697          self.stop() 
  698   
 700          log.info('Received SIGHUP') 
 701          self.restart() 
  702   
 703 -    def start(self, background=False): 
  704          log.info('Starting %s' % SERVER_SOFTWARE) 
 705   
 706          self.startstop_lock.acquire() 
 707   
 708          try: 
 709               
 710              if self.handle_signals: 
 711                  try: 
 712                      import signal 
 713                      signal.signal(signal.SIGTERM, self._sigterm) 
 714                      signal.signal(signal.SIGUSR1, self._sighup) 
 715                  except: 
 716                      log.debug('This platform does not support signals.') 
 717   
 718               
 719              self._threadpool.start() 
 720   
 721               
 722              self._monitor = Monitor(self.monitor_queue, 
 723                                      self.active_queue, 
 724                                      self.timeout, 
 725                                      self._threadpool) 
 726              self._monitor.setDaemon(True) 
 727              self._monitor.start() 
 728   
 729               
 730               
 731              str_extract = lambda l: (l.addr, l.port, l.secure and '*' or '') 
 732   
 733              msg = 'Listening on sockets: ' 
 734              msg += ', '.join( 
 735                  ['%s:%i%s' % str_extract(l) for l in self.listeners]) 
 736              log.info(msg) 
 737   
 738              for l in self.listeners: 
 739                  l.start() 
 740   
 741          finally: 
 742              self.startstop_lock.release() 
 743   
 744          if background: 
 745              return 
 746   
 747          while self._monitor.isAlive(): 
 748              try: 
 749                  time.sleep(THREAD_STOP_CHECK_INTERVAL) 
 750              except KeyboardInterrupt: 
 751                   
 752                  break 
 753              except: 
 754                  if self._monitor.isAlive(): 
 755                      log.error(traceback.format_exc()) 
 756                      continue 
 757   
 758          return self.stop() 
  759   
 760 -    def stop(self, stoplogging=False): 
  761          log.info('Stopping %s' % SERVER_SOFTWARE) 
 762   
 763          self.startstop_lock.acquire() 
 764   
 765          try: 
 766               
 767              for l in self.listeners: 
 768                  l.ready = False 
 769   
 770               
 771              time.sleep(0.01) 
 772   
 773              for l in self.listeners: 
 774                  if l.isAlive(): 
 775                      l.join() 
 776   
 777               
 778              self._monitor.stop() 
 779              if self._monitor.isAlive(): 
 780                  self._monitor.join() 
 781   
 782               
 783              self._threadpool.stop() 
 784   
 785              if stoplogging: 
 786                  logging.shutdown() 
 787                  msg = "Calling logging.shutdown() is now the responsibility of \ 
 788                         the application developer.  Please update your \ 
 789                         applications to no longer call rocket.stop(True)" 
 790                  try: 
 791                      import warnings 
 792                      raise warnings.DeprecationWarning(msg) 
 793                  except ImportError: 
 794                      raise RuntimeError(msg) 
 795   
 796          finally: 
 797              self.startstop_lock.release() 
  798   
  802   
 803   
 804 -def CherryPyWSGIServer(bind_addr, 
 805                         wsgi_app, 
 806                         numthreads=10, 
 807                         server_name=None, 
 808                         max=-1, 
 809                         request_queue_size=5, 
 810                         timeout=10, 
 811                         shutdown_timeout=5): 
  812      """ A Cherrypy wsgiserver-compatible wrapper. """ 
 813      max_threads = max 
 814      if max_threads < 0: 
 815          max_threads = 0 
 816      return Rocket(bind_addr, 'wsgi', {'wsgi_app': wsgi_app}, 
 817                    min_threads=numthreads, 
 818                    max_threads=max_threads, 
 819                    queue_size=request_queue_size, 
 820                    timeout=timeout) 
  821   
 822   
 823   
 824   
 825   
 826  import time 
 827  import logging 
 828  import select 
 829  from threading import Thread 
 830   
 831   
 832   
 833   
 834   
 836       
 837   
 838 -    def __init__(self, 
 839                   monitor_queue, 
 840                   active_queue, 
 841                   timeout, 
 842                   threadpool, 
 843                   *args, 
 844                   **kwargs): 
  845   
 846          Thread.__init__(self, *args, **kwargs) 
 847   
 848          self._threadpool = threadpool 
 849   
 850           
 851          self.monitor_queue = monitor_queue 
 852          self.active_queue = active_queue 
 853          self.timeout = timeout 
 854   
 855          self.log = logging.getLogger('Rocket.Monitor') 
 856          self.log.addHandler(NullHandler()) 
 857   
 858          self.connections = set() 
 859          self.active = False 
  860   
 862          self.active = True 
 863          conn_list = list() 
 864          list_changed = False 
 865   
 866           
 867          while not self.monitor_queue.empty(): 
 868              self.monitor_queue.get() 
 869   
 870          if __debug__: 
 871              self.log.debug('Entering monitor loop.') 
 872   
 873           
 874          while self.active: 
 875   
 876               
 877              while not self.monitor_queue.empty(): 
 878                  if __debug__: 
 879                      self.log.debug('In "receive timed-out connections" loop.') 
 880   
 881                  c = self.monitor_queue.get() 
 882   
 883                  if c is None: 
 884                       
 885                      if __debug__: 
 886                          self.log.debug('Received a death threat.') 
 887                      self.stop() 
 888                      break 
 889   
 890                  self.log.debug('Received a timed out connection.') 
 891   
 892                  if __debug__: 
 893                      assert(c not in self.connections) 
 894   
 895                  if IS_JYTHON: 
 896                       
 897                       
 898                      c.setblocking(False) 
 899   
 900                  if __debug__: 
 901                      self.log.debug('Adding connection to monitor list.') 
 902   
 903                  self.connections.add(c) 
 904                  list_changed = True 
 905   
 906               
 907              if list_changed: 
 908                  conn_list = list(self.connections) 
 909                  list_changed = False 
 910   
 911              try: 
 912                  if len(conn_list): 
 913                      readable = select.select(conn_list, 
 914                                               [], 
 915                                               [], 
 916                                               THREAD_STOP_CHECK_INTERVAL)[0] 
 917                  else: 
 918                      time.sleep(THREAD_STOP_CHECK_INTERVAL) 
 919                      readable = [] 
 920   
 921                  if not self.active: 
 922                      break 
 923   
 924                   
 925                  for r in readable: 
 926                      if __debug__: 
 927                          self.log.debug('Restoring readable connection') 
 928   
 929                      if IS_JYTHON: 
 930                           
 931                           
 932                           
 933                          r.setblocking(True) 
 934   
 935                      r.start_time = time.time() 
 936                      self.active_queue.put(r) 
 937   
 938                      self.connections.remove(r) 
 939                      list_changed = True 
 940   
 941              except: 
 942                  if self.active: 
 943                      raise 
 944                  else: 
 945                      break 
 946   
 947               
 948              if self.timeout: 
 949                  now = time.time() 
 950                  stale = set() 
 951                  for c in self.connections: 
 952                      if (now - c.start_time) >= self.timeout: 
 953                          stale.add(c) 
 954   
 955                  for c in stale: 
 956                      if __debug__: 
 957                           
 958                          data = ( 
 959                              c.client_addr, c.server_port, c.ssl and '*' or '') 
 960                          self.log.debug( 
 961                              'Flushing stale connection: %s:%i%s' % data) 
 962   
 963                      self.connections.remove(c) 
 964                      list_changed = True 
 965   
 966                      try: 
 967                          c.close() 
 968                      finally: 
 969                          del c 
 970   
 971               
 972              self._threadpool.dynamic_resize() 
  973   
 975          self.active = False 
 976   
 977          if __debug__: 
 978              self.log.debug('Flushing waiting connections') 
 979   
 980          while self.connections: 
 981              c = self.connections.pop() 
 982              try: 
 983                  c.close() 
 984              finally: 
 985                  del c 
 986   
 987          if __debug__: 
 988              self.log.debug('Flushing queued connections') 
 989   
 990          while not self.monitor_queue.empty(): 
 991              c = self.monitor_queue.get() 
 992   
 993              if c is None: 
 994                  continue 
 995   
 996              try: 
 997                  c.close() 
 998              finally: 
 999                  del c 
1000   
1001           
1002          self.monitor_queue.put(None) 
  1003   
1004   
1005   
1006   
1007   
1008  import logging 
1009   
1010   
1011   
1012   
1013   
1014  log = logging.getLogger('Rocket.Errors.ThreadPool') 
1015  log.addHandler(NullHandler()) 
1016   
1017   
1019      """The ThreadPool class is a container class for all the worker threads. It 
1020      manages the number of actively running threads.""" 
1021   
1022 -    def __init__(self, 
1023                   method, 
1024                   app_info, 
1025                   active_queue, 
1026                   monitor_queue, 
1027                   min_threads=DEFAULTS['MIN_THREADS'], 
1028                   max_threads=DEFAULTS['MAX_THREADS'], 
1029                   ): 
 1030   
1031          if __debug__: 
1032              log.debug("Initializing ThreadPool.") 
1033   
1034          self.check_for_dead_threads = 0 
1035          self.active_queue = active_queue 
1036   
1037          self.worker_class = method 
1038          self.min_threads = min_threads 
1039          self.max_threads = max_threads 
1040          self.monitor_queue = monitor_queue 
1041          self.stop_server = False 
1042          self.alive = False 
1043   
1044           
1045          self.grow_threshold = int(max_threads / 10) + 2 
1046   
1047          if not isinstance(app_info, dict): 
1048              app_info = dict() 
1049   
1050          if has_futures and app_info.get('futures'): 
1051              app_info['executor'] = WSGIExecutor(max([DEFAULTS['MIN_THREADS'], 
1052                                                       2])) 
1053   
1054          app_info.update(max_threads=max_threads, 
1055                          min_threads=min_threads) 
1056   
1057          self.min_threads = min_threads 
1058          self.app_info = app_info 
1059   
1060          self.threads = set() 
 1061   
1063          self.stop_server = False 
1064          if __debug__: 
1065              log.debug("Starting threads.") 
1066   
1067          self.grow(self.min_threads) 
1068   
1069          self.alive = True 
 1070   
1072          self.alive = False 
1073   
1074          if __debug__: 
1075              log.debug("Stopping threads.") 
1076   
1077          self.stop_server = True 
1078   
1079           
1080          self.shrink(len(self.threads)) 
1081   
1082           
1083          if has_futures and self.app_info.get('futures'): 
1084              if __debug__: 
1085                  log.debug("Future executor is present.  Python will not " 
1086                            "exit until all jobs have finished.") 
1087              self.app_info['executor'].shutdown(wait=False) 
1088   
1089           
1090           
1091           
1092           
1093           
1094   
1095           
1096          for t in self.threads: 
1097              if t.isAlive(): 
1098                  t.join() 
1099   
1100           
1101          self.bring_out_your_dead() 
 1102   
1104           
1105   
1106          dead_threads = [t for t in self.threads if not t.isAlive()] 
1107          for t in dead_threads: 
1108              if __debug__: 
1109                  log.debug("Removing dead thread: %s." % t.getName()) 
1110              try: 
1111                   
1112                  self.threads.remove(t) 
1113              except: 
1114                  pass 
1115          self.check_for_dead_threads -= len(dead_threads) 
 1116   
1117 -    def grow(self, amount=None): 
 1118          if self.stop_server: 
1119              return 
1120   
1121          if not amount: 
1122              amount = self.max_threads 
1123   
1124          if self.alive: 
1125              amount = min([amount, self.max_threads - len(self.threads)]) 
1126   
1127          if __debug__: 
1128              log.debug("Growing by %i." % amount) 
1129   
1130          for x in range(amount): 
1131              worker = self.worker_class(self.app_info, 
1132                                         self.active_queue, 
1133                                         self.monitor_queue) 
1134   
1135              worker.setDaemon(True) 
1136              self.threads.add(worker) 
1137              worker.start() 
 1138   
1140          if __debug__: 
1141              log.debug("Shrinking by %i." % amount) 
1142   
1143          self.check_for_dead_threads += amount 
1144   
1145          for x in range(amount): 
1146              self.active_queue.put(None) 
 1147   
1149          if (self.max_threads > self.min_threads or self.max_threads == 0): 
1150              if self.check_for_dead_threads > 0: 
1151                  self.bring_out_your_dead() 
1152   
1153              queueSize = self.active_queue.qsize() 
1154              threadCount = len(self.threads) 
1155   
1156              if __debug__: 
1157                  log.debug("Examining ThreadPool. %i threads and %i Q'd conxions" 
1158                            % (threadCount, queueSize)) 
1159   
1160              if queueSize == 0 and threadCount > self.min_threads: 
1161                  self.shrink() 
1162   
1163              elif queueSize > self.grow_threshold: 
1164   
1165                  self.grow(queueSize) 
  1166   
1167   
1168   
1169   
1170   
1171  import re 
1172  import sys 
1173  import socket 
1174  import logging 
1175  import traceback 
1176  from wsgiref.headers import Headers 
1177  from threading import Thread 
1178  from datetime import datetime 
1179   
1180  try: 
1181      from urllib import unquote 
1182  except ImportError: 
1183      from urllib.parse import unquote 
1184   
1185  try: 
1186      from io import StringIO 
1187  except ImportError: 
1188      try: 
1189          from cStringIO import StringIO 
1190      except ImportError: 
1191          from StringIO import StringIO 
1192   
1193  try: 
1194      from ssl import SSLError 
1195  except ImportError: 
1198   
1199   
1200   
1201   
1202   
1203  re_SLASH = re.compile('%2F', re.IGNORECASE) 
1204  re_REQUEST_LINE = re.compile(r"""^ 
1205  (?P<method>OPTIONS|GET|HEAD|POST|PUT|DELETE|TRACE|CONNECT)   # Request Method 
1206  \                                                            # (single space) 
1207  ( 
1208      (?P<scheme>[^:/]+)                                       # Scheme 
1209      (://)  # 
1210      (?P<host>[^/]+)                                          # Host 
1211  )? # 
1212  (?P<path>(\*|/[^ \?]*))                                      # Path 
1213  (\? (?P<query_string>[^ ]*))?                                # Query String 
1214  \                                                            # (single space) 
1215  (?P<protocol>HTTPS?/1\.[01])                                 # Protocol 
1216  $ 
1217  """, re.X) 
1218  LOG_LINE = '%(client_ip)s - "%(request_line)s" - %(status)s %(size)s' 
1219  RESPONSE = '''\ 
1220  %s %s 
1221  Content-Length: %i 
1222  Content-Type: %s 
1223   
1224  %s 
1225  ''' 
1226  if IS_JYTHON: 
1227      HTTP_METHODS = set(['OPTIONS', 'GET', 'HEAD', 'POST', 'PUT', 
1228                          'DELETE', 'TRACE', 'CONNECT']) 
1229   
1230   
1232      """The Worker class is a base class responsible for receiving connections 
1233      and (a subclass) will run an application to process the the connection """ 
1234   
1235 -    def __init__(self, 
1236                   app_info, 
1237                   active_queue, 
1238                   monitor_queue, 
1239                   *args, 
1240                   **kwargs): 
 1241   
1242          Thread.__init__(self, *args, **kwargs) 
1243   
1244           
1245          self.app_info = app_info 
1246          self.active_queue = active_queue 
1247          self.monitor_queue = monitor_queue 
1248   
1249          self.size = 0 
1250          self.status = "200 OK" 
1251          self.closeConnection = True 
1252          self.request_line = "" 
1253          self.protocol = 'HTTP/1.1' 
1254   
1255           
1256          self.req_log = logging.getLogger('Rocket.Requests') 
1257          self.req_log.addHandler(NullHandler()) 
1258   
1259           
1260          self.err_log = logging.getLogger('Rocket.Errors.' + self.getName()) 
1261          self.err_log.addHandler(NullHandler()) 
 1262   
1264          if typ == SSLError: 
1265              if 'timed out' in str(val.args[0]): 
1266                  typ = SocketTimeout 
1267          if typ == SocketTimeout: 
1268              if __debug__: 
1269                  self.err_log.debug('Socket timed out') 
1270              self.monitor_queue.put(self.conn) 
1271              return True 
1272          if typ == SocketClosed: 
1273              self.closeConnection = True 
1274              if __debug__: 
1275                  self.err_log.debug('Client closed socket') 
1276              return False 
1277          if typ == BadRequest: 
1278              self.closeConnection = True 
1279              if __debug__: 
1280                  self.err_log.debug('Client sent a bad request') 
1281              return True 
1282          if typ == socket.error: 
1283              self.closeConnection = True 
1284              if val.args[0] in IGNORE_ERRORS_ON_CLOSE: 
1285                  if __debug__: 
1286                      self.err_log.debug('Ignorable socket Error received...' 
1287                                         'closing connection.') 
1288                  return False 
1289              else: 
1290                  self.status = "999 Utter Server Failure" 
1291                  tb_fmt = traceback.format_exception(typ, val, tb) 
1292                  self.err_log.error('Unhandled Error when serving ' 
1293                                     'connection:\n' + '\n'.join(tb_fmt)) 
1294                  return False 
1295   
1296          self.closeConnection = True 
1297          tb_fmt = traceback.format_exception(typ, val, tb) 
1298          self.err_log.error('\n'.join(tb_fmt)) 
1299          self.send_response('500 Server Error') 
1300          return False 
 1301   
1303          if __debug__: 
1304              self.err_log.debug('Entering main loop.') 
1305   
1306           
1307          while True: 
1308              conn = self.active_queue.get() 
1309   
1310              if not conn: 
1311                   
1312                  if __debug__: 
1313                      self.err_log.debug('Received a death threat.') 
1314                  return conn 
1315   
1316              if isinstance(conn, tuple): 
1317                  conn = Connection(*conn) 
1318   
1319              self.conn = conn 
1320   
1321              if conn.ssl != conn.secure: 
1322                  self.err_log.info('Received HTTP connection on HTTPS port.') 
1323                  self.send_response('400 Bad Request') 
1324                  self.closeConnection = True 
1325                  conn.close() 
1326                  continue 
1327              else: 
1328                  if __debug__: 
1329                      self.err_log.debug('Received a connection.') 
1330                  self.closeConnection = False 
1331   
1332               
1333              while True: 
1334                  if __debug__: 
1335                      self.err_log.debug('Serving a request') 
1336                  try: 
1337                      self.run_app(conn) 
1338                  except: 
1339                      exc = sys.exc_info() 
1340                      handled = self._handleError(*exc) 
1341                      if handled: 
1342                          break 
1343                  finally: 
1344                      if self.request_line: 
1345                          log_info = dict(client_ip=conn.client_addr, 
1346                                          time=datetime.now().strftime('%c'), 
1347                                          status=self.status.split(' ')[0], 
1348                                          size=self.size, 
1349                                          request_line=self.request_line) 
1350                          self.req_log.info(LOG_LINE % log_info) 
1351   
1352                  if self.closeConnection: 
1353                      try: 
1354                          conn.close() 
1355                      except: 
1356                          self.err_log.error(str(traceback.format_exc())) 
1357   
1358                      break 
 1359   
1361           
1362           
1363          self.closeConnection = True 
1364          raise NotImplementedError('Overload this method!') 
 1365   
1367          stat_msg = status.split(' ', 1)[1] 
1368          msg = RESPONSE % (self.protocol, 
1369                            status, 
1370                            len(stat_msg), 
1371                            'text/plain', 
1372                            stat_msg) 
1373          try: 
1374              self.conn.sendall(b(msg)) 
1375          except socket.timeout: 
1376              self.closeConnection = True 
1377              msg = 'Tried to send "%s" to client but received timeout error' 
1378              self.err_log.error(msg % status) 
1379          except socket.error: 
1380              self.closeConnection = True 
1381              msg = 'Tried to send "%s" to client but received socket error' 
1382              self.err_log.error(msg % status) 
 1383   
1385          self.request_line = '' 
1386          try: 
1387               
1388              d = sock_file.readline() 
1389              if PY3K: 
1390                  d = d.decode('ISO-8859-1') 
1391   
1392              if d == '\r\n': 
1393                   
1394                  if __debug__: 
1395                      self.err_log.debug('Client sent newline') 
1396   
1397                  d = sock_file.readline() 
1398                  if PY3K: 
1399                      d = d.decode('ISO-8859-1') 
1400          except socket.timeout: 
1401              raise SocketTimeout('Socket timed out before request.') 
1402          except TypeError: 
1403              raise SocketClosed( 
1404                  'SSL bug caused closure of socket.  See ' 
1405                  '"https://groups.google.com/d/topic/web2py/P_Gw0JxWzCs".') 
1406   
1407          d = d.strip() 
1408   
1409          if not d: 
1410              if __debug__: 
1411                  self.err_log.debug( 
1412                      'Client did not send a recognizable request.') 
1413              raise SocketClosed('Client closed socket.') 
1414   
1415          self.request_line = d 
1416   
1417           
1418           
1419           
1420           
1421           
1422          if IS_JYTHON: 
1423              return self._read_request_line_jython(d) 
1424   
1425          match = re_REQUEST_LINE.match(d) 
1426   
1427          if not match: 
1428              self.send_response('400 Bad Request') 
1429              raise BadRequest 
1430   
1431          req = match.groupdict() 
1432          for k, v in req.iteritems(): 
1433              if not v: 
1434                  req[k] = "" 
1435              if k == 'path': 
1436                  req['path'] = r'%2F'.join( 
1437                      [unquote(x) for x in re_SLASH.split(v)]) 
1438   
1439          self.protocol = req['protocol'] 
1440          return req 
 1441   
1443          d = d.strip() 
1444          try: 
1445              method, uri, proto = d.split(' ') 
1446              if not proto.startswith('HTTP') or \ 
1447                      proto[-3:] not in ('1.0', '1.1') or \ 
1448                      method not in HTTP_METHODS: 
1449                  self.send_response('400 Bad Request') 
1450                  raise BadRequest 
1451          except ValueError: 
1452              self.send_response('400 Bad Request') 
1453              raise BadRequest 
1454   
1455          req = dict(method=method, protocol=proto) 
1456          scheme = '' 
1457          host = '' 
1458          if uri == '*' or uri.startswith('/'): 
1459              path = uri 
1460          elif '://' in uri: 
1461              scheme, rest = uri.split('://') 
1462              host, path = rest.split('/', 1) 
1463              path = '/' + path 
1464          else: 
1465              self.send_response('400 Bad Request') 
1466              raise BadRequest 
1467   
1468          query_string = '' 
1469          if '?' in path: 
1470              path, query_string = path.split('?', 1) 
1471   
1472          path = r'%2F'.join([unquote(x) for x in re_SLASH.split(path)]) 
1473   
1474          req.update(path=path, 
1475                     query_string=query_string, 
1476                     scheme=scheme.lower(), 
1477                     host=host) 
1478          return req 
 1479   
1481          try: 
1482              headers = dict() 
1483              lname = None 
1484              lval = None 
1485              while True: 
1486                  l = sock_file.readline() 
1487   
1488                  if PY3K: 
1489                      try: 
1490                          l = str(l, 'ISO-8859-1') 
1491                      except UnicodeDecodeError: 
1492                          self.err_log.warning( 
1493                              'Client sent invalid header: ' + repr(l)) 
1494   
1495                  if l.strip().replace('\0', '') == '': 
1496                      break 
1497   
1498                  if l[0] in ' \t' and lname: 
1499                       
1500                      lval += ' ' + l.strip() 
1501                  else: 
1502                       
1503                      l = l.split(':', 1) 
1504                       
1505   
1506                      lname = l[0].strip().upper().replace('-', '_') 
1507                      lval = l[-1].strip() 
1508   
1509                  headers[str(lname)] = str(lval) 
1510   
1511          except socket.timeout: 
1512              raise SocketTimeout("Socket timed out before request.") 
1513   
1514          return headers 
  1515   
1516   
1518      "Exception for when a socket times out between requests." 
1519      pass 
 1520   
1521   
1523      "Exception for when a client sends an incomprehensible request." 
1524      pass 
 1525   
1526   
1528      "Exception for when a socket is closed by the client." 
1529      pass 
 1530   
1531   
1533   
1535          self.stream = sock_file 
1536          self.chunk_size = 0 
 1537   
1539          chunk_len = "" 
1540          try: 
1541              while "" == chunk_len: 
1542                  chunk_len = self.stream.readline().strip() 
1543              return int(chunk_len, 16) 
1544          except ValueError: 
1545              return 0 
 1546   
1547 -    def read(self, size): 
 1548          data = b('') 
1549          chunk_size = self.chunk_size 
1550          while size: 
1551              if not chunk_size: 
1552                  chunk_size = self._read_header() 
1553   
1554              if size < chunk_size: 
1555                  data += self.stream.read(size) 
1556                  chunk_size -= size 
1557                  break 
1558              else: 
1559                  if not chunk_size: 
1560                      break 
1561                  data += self.stream.read(chunk_size) 
1562                  size -= chunk_size 
1563                  chunk_size = 0 
1564   
1565          self.chunk_size = chunk_size 
1566          return data 
 1567   
1569          data = b('') 
1570          c = self.read(1) 
1571          while c and c != b('\n'): 
1572              data += c 
1573              c = self.read(1) 
1574          data += c 
1575          return data 
 1576   
 1579   
1580   
1584   
1585   
1586   
1587   
1588   
1589   
1590   
1591   
1592  import sys 
1593  import socket 
1594  from wsgiref.headers import Headers 
1595  from wsgiref.util import FileWrapper 
1596   
1597   
1598   
1599   
1600  if PY3K: 
1601      from email.utils import formatdate 
1602  else: 
1603       
1604      from email.Utils import formatdate 
1605   
1606   
1607  NEWLINE = b('\r\n') 
1608  HEADER_RESPONSE = '''HTTP/1.1 %s\r\n%s''' 
1609  BASE_ENV = {'SERVER_NAME': SERVER_NAME, 
1610              'SCRIPT_NAME': '',   
1611              'wsgi.errors': sys.stderr, 
1612              'wsgi.version': (1, 0), 
1613              'wsgi.multiprocess': False, 
1614              'wsgi.run_once': False, 
1615              'wsgi.file_wrapper': FileWrapper 
1616              } 
1617   
1618   
1621          """Builds some instance variables that will last the life of the 
1622          thread.""" 
1623          Worker.__init__(self, *args, **kwargs) 
1624   
1625          if isinstance(self.app_info, dict): 
1626              multithreaded = self.app_info.get('max_threads') != 1 
1627          else: 
1628              multithreaded = False 
1629          self.base_environ = dict( 
1630              {'SERVER_SOFTWARE': self.app_info['server_software'], 
1631               'wsgi.multithread': multithreaded, 
1632               }) 
1633          self.base_environ.update(BASE_ENV) 
1634   
1635           
1636          self.app = self.app_info.get('wsgi_app') 
1637   
1638          if not hasattr(self.app, "__call__"): 
1639              raise TypeError("The wsgi_app specified (%s) is not a valid WSGI application." % repr(self.app)) 
1640   
1641           
1642          if has_futures and self.app_info.get('futures'): 
1643              executor = self.app_info['executor'] 
1644              self.base_environ.update({"wsgiorg.executor": executor, 
1645                                        "wsgiorg.futures": executor.futures}) 
 1646   
1648          """ Build the execution environment. """ 
1649           
1650          request = self.read_request_line(sock_file) 
1651   
1652           
1653          environ = self.base_environ.copy() 
1654   
1655           
1656          for k, v in self.read_headers(sock_file).iteritems(): 
1657              environ[str('HTTP_' + k)] = v 
1658   
1659           
1660          environ['REQUEST_METHOD'] = request['method'] 
1661          environ['PATH_INFO'] = request['path'] 
1662          environ['SERVER_PROTOCOL'] = request['protocol'] 
1663          environ['SERVER_PORT'] = str(conn.server_port) 
1664          environ['REMOTE_PORT'] = str(conn.client_port) 
1665          environ['REMOTE_ADDR'] = str(conn.client_addr) 
1666          environ['QUERY_STRING'] = request['query_string'] 
1667          if 'HTTP_CONTENT_LENGTH' in environ: 
1668              environ['CONTENT_LENGTH'] = environ['HTTP_CONTENT_LENGTH'] 
1669          if 'HTTP_CONTENT_TYPE' in environ: 
1670              environ['CONTENT_TYPE'] = environ['HTTP_CONTENT_TYPE'] 
1671   
1672           
1673          self.request_method = environ['REQUEST_METHOD'] 
1674   
1675           
1676          if conn.ssl: 
1677              environ['wsgi.url_scheme'] = 'https' 
1678              environ['HTTPS'] = 'on' 
1679              try: 
1680                  peercert = conn.socket.getpeercert(binary_form=True) 
1681                  environ['SSL_CLIENT_RAW_CERT'] = \ 
1682                      peercert and ssl.DER_cert_to_PEM_cert(peercert) 
1683              except Exception: 
1684                  print sys.exc_info()[1] 
1685          else: 
1686              environ['wsgi.url_scheme'] = 'http' 
1687   
1688          if environ.get('HTTP_TRANSFER_ENCODING', '') == 'chunked': 
1689              environ['wsgi.input'] = ChunkedReader(sock_file) 
1690          else: 
1691              environ['wsgi.input'] = sock_file 
1692   
1693          return environ 
 1694   
1696          h_set = self.header_set 
1697   
1698           
1699          self.chunked = h_set.get('Transfer-Encoding', '').lower() == 'chunked' 
1700   
1701           
1702          if not 'Date' in h_set: 
1703              h_set['Date'] = formatdate(usegmt=True) 
1704   
1705           
1706          if not 'Server' in h_set: 
1707              h_set['Server'] = HTTP_SERVER_SOFTWARE 
1708   
1709          if 'Content-Length' in h_set: 
1710              self.size = int(h_set['Content-Length']) 
1711          else: 
1712              s = int(self.status.split(' ')[0]) 
1713              if (s < 200 or s not in (204, 205, 304)) and not self.chunked: 
1714                  if sections == 1 or self.protocol != 'HTTP/1.1': 
1715                       
1716                      self.size = len(data) 
1717                      h_set['Content-Length'] = str(self.size) 
1718                  else: 
1719                       
1720                      h_set['Transfer-Encoding'] = 'Chunked' 
1721                      self.chunked = True 
1722                      if __debug__: 
1723                          self.err_log.debug('Adding header...' 
1724                                             'Transfer-Encoding: Chunked') 
1725   
1726          if 'Connection' not in h_set: 
1727               
1728               
1729              client_conn = self.environ.get('HTTP_CONNECTION', '').lower() 
1730              if self.environ['SERVER_PROTOCOL'] == 'HTTP/1.1': 
1731                   
1732                  if client_conn: 
1733                      h_set['Connection'] = client_conn 
1734                  else: 
1735                      h_set['Connection'] = 'keep-alive' 
1736              else: 
1737                   
1738                   
1739                  h_set['Connection'] = 'close' 
1740   
1741           
1742          self.closeConnection = h_set.get('Connection', '').lower() == 'close' 
1743   
1744           
1745          header_data = HEADER_RESPONSE % (self.status, str(h_set)) 
1746   
1747           
1748          if __debug__: 
1749              self.err_log.debug('Sending Headers: %s' % repr(header_data)) 
1750          self.conn.sendall(b(header_data)) 
1751          self.headers_sent = True 
 1752   
1754          self.err_log.warning('WSGI app called write method directly.  This is ' 
1755                               'deprecated behavior.  Please update your app.') 
1756          return self.write(data, sections) 
 1757   
1758 -    def write(self, data, sections=None): 
 1759          """ Write the data to the output socket. """ 
1760   
1761          if self.error[0]: 
1762              self.status = self.error[0] 
1763              data = b(self.error[1]) 
1764   
1765          if not self.headers_sent: 
1766              self.send_headers(data, sections) 
1767   
1768          if self.request_method != 'HEAD': 
1769              try: 
1770                  if self.chunked: 
1771                      self.conn.sendall(b('%x\r\n%s\r\n' % (len(data), data))) 
1772                  else: 
1773                      self.conn.sendall(data) 
1774              except socket.timeout: 
1775                  self.closeConnection = True 
1776              except socket.error: 
1777                   
1778                   
1779                  self.closeConnection = True 
 1780   
1782          """ Store the HTTP status and headers to be sent when self.write is 
1783          called. """ 
1784          if exc_info: 
1785              try: 
1786                  if self.headers_sent: 
1787                       
1788                       
1789                      raise 
1790              finally: 
1791                  exc_info = None 
1792          elif self.header_set: 
1793              raise AssertionError("Headers already set!") 
1794   
1795          if PY3K and not isinstance(status, str): 
1796              self.status = str(status, 'ISO-8859-1') 
1797          else: 
1798              self.status = status 
1799           
1800          try: 
1801              self.header_set = Headers(response_headers) 
1802          except UnicodeDecodeError: 
1803              self.error = ('500 Internal Server Error', 
1804                            'HTTP Headers should be bytes') 
1805              self.err_log.error('Received HTTP Headers from client that contain' 
1806                                 ' invalid characters for Latin-1 encoding.') 
1807   
1808          return self.write_warning 
 1809   
1811          self.size = 0 
1812          self.header_set = Headers([]) 
1813          self.headers_sent = False 
1814          self.error = (None, None) 
1815          self.chunked = False 
1816          sections = None 
1817          output = None 
1818   
1819          if __debug__: 
1820              self.err_log.debug('Getting sock_file') 
1821   
1822           
1823          if PY3K: 
1824              sock_file = conn.makefile(mode='rb', buffering=BUF_SIZE) 
1825          else: 
1826              sock_file = conn.makefile(BUF_SIZE) 
1827   
1828          try: 
1829               
1830              self.environ = environ = self.build_environ(sock_file, conn) 
1831   
1832               
1833              if environ.get('HTTP_EXPECT', '') == '100-continue': 
1834                  res = environ['SERVER_PROTOCOL'] + ' 100 Continue\r\n\r\n' 
1835                  conn.sendall(b(res)) 
1836   
1837               
1838              output = self.app(environ, self.start_response) 
1839   
1840              if not hasattr(output, '__len__') and not hasattr(output, '__iter__'): 
1841                  self.error = ('500 Internal Server Error', 
1842                                'WSGI applications must return a list or ' 
1843                                'generator type.') 
1844   
1845              if hasattr(output, '__len__'): 
1846                  sections = len(output) 
1847   
1848              for data in output: 
1849                   
1850                  if data: 
1851                      self.write(data, sections) 
1852   
1853              if self.chunked: 
1854                   
1855                  self.conn.sendall(b('0\r\n\r\n')) 
1856              elif not self.headers_sent: 
1857                   
1858                  self.send_headers('', sections) 
1859   
1860           
1861           
1862          finally: 
1863              if __debug__: 
1864                  self.err_log.debug('Finally closing output and sock_file') 
1865   
1866              if hasattr(output, 'close'): 
1867                  output.close() 
1868   
1869              sock_file.close()