Package gluon :: Module scheduler
[hide private]
[frames] | no frames]

Source Code for Module gluon.scheduler

   1  #!/usr/bin/env python 
   2  # -*- coding: utf-8 -*- 
   3   
   4  USAGE = """ 
   5  ## Example 
   6   
   7  For any existing app 
   8   
   9  Create File: app/models/scheduler.py ====== 
  10  from gluon.scheduler import Scheduler 
  11   
  12  def demo1(*args,**vars): 
  13      print 'you passed args=%s and vars=%s' % (args, vars) 
  14      return 'done!' 
  15   
  16  def demo2(): 
  17      1/0 
  18   
  19  scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2)) 
  20  ## run worker nodes with: 
  21   
  22     cd web2py 
  23     python web2py.py -K myapp 
  24  or 
  25     python gluon/scheduler.py -u sqlite://storage.sqlite \ 
  26                               -f applications/myapp/databases/ \ 
  27                               -t mytasks.py 
  28  (-h for info) 
  29  python scheduler.py -h 
  30   
  31  ## schedule jobs using 
  32  http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task 
  33   
  34  ## monitor scheduled jobs 
  35  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0 
  36   
  37  ## view completed jobs 
  38  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0 
  39   
  40  ## view workers 
  41  http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0 
  42   
  43  ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put 
  44  ## the following into /etc/init/web2py-scheduler.conf: 
  45  ## (This assumes your web2py instance is installed in <user>'s home directory, 
  46  ## running as <user>, with app <myapp>, on network interface eth0.) 
  47   
  48  description "web2py task scheduler" 
  49  start on (local-filesystems and net-device-up IFACE=eth0) 
  50  stop on shutdown 
  51  respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds. 
  52  exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp> 
  53  respawn 
  54   
  55  ## You can then start/stop/restart/check status of the daemon with: 
  56  sudo start web2py-scheduler 
  57  sudo stop web2py-scheduler 
  58  sudo restart web2py-scheduler 
  59  sudo status web2py-scheduler 
  60  """ 
  61   
  62  import os 
  63  import time 
  64  import multiprocessing 
  65  import sys 
  66  import threading 
  67  import traceback 
  68  import signal 
  69  import socket 
  70  import datetime 
  71  import logging 
  72  import optparse 
  73  import types 
  74  import Queue 
  75   
  76  if 'WEB2PY_PATH' in os.environ: 
  77      sys.path.append(os.environ['WEB2PY_PATH']) 
  78  else: 
  79      os.environ['WEB2PY_PATH'] = os.getcwd() 
  80   
  81  if not os.environ['WEB2PY_PATH'] in sys.path: 
  82      sys.path.append(os.environ['WEB2PY_PATH']) 
  83   
  84  try: 
  85      from gluon.contrib.simplejson import loads, dumps 
  86  except: 
  87      from simplejson import loads, dumps 
  88   
  89  IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid()) 
  90   
  91  logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER) 
  92   
  93  from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_INT_IN_RANGE, IS_DATETIME 
  94  from gluon.utils import web2py_uuid 
  95  from gluon.storage import Storage 
  96   
  97   
  98  QUEUED = 'QUEUED' 
  99  ASSIGNED = 'ASSIGNED' 
 100  RUNNING = 'RUNNING' 
 101  COMPLETED = 'COMPLETED' 
 102  FAILED = 'FAILED' 
 103  TIMEOUT = 'TIMEOUT' 
 104  STOPPED = 'STOPPED' 
 105  ACTIVE = 'ACTIVE' 
 106  TERMINATE = 'TERMINATE' 
 107  DISABLED = 'DISABLED' 
 108  KILL = 'KILL' 
 109  PICK = 'PICK' 
 110  STOP_TASK = 'STOP_TASK' 
 111  EXPIRED = 'EXPIRED' 
 112  SECONDS = 1 
 113  HEARTBEAT = 3 * SECONDS 
 114  MAXHIBERNATION = 10 
 115  CLEAROUT = '!clear!' 
 116   
 117  CALLABLETYPES = (types.LambdaType, types.FunctionType, 
 118                   types.BuiltinFunctionType, 
 119                   types.MethodType, types.BuiltinMethodType) 
 120   
 121   
122 -class Task(object):
123 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
124 logger.debug(' new task allocated: %s.%s', app, function) 125 self.app = app 126 self.function = function 127 self.timeout = timeout 128 self.args = args # json 129 self.vars = vars # json 130 self.__dict__.update(kwargs)
131
132 - def __str__(self):
133 return '<Task: %s>' % self.function
134 135
136 -class TaskReport(object):
137 - def __init__(self, status, result=None, output=None, tb=None):
138 logger.debug(' new task report: %s', status) 139 if tb: 140 logger.debug(' traceback: %s', tb) 141 else: 142 logger.debug(' result: %s', result) 143 self.status = status 144 self.result = result 145 self.output = output 146 self.tb = tb
147
148 - def __str__(self):
149 return '<TaskReport: %s>' % self.status
150 151
152 -def demo_function(*argv, **kwargs):
153 """ test function """ 154 for i in range(argv[0]): 155 print 'click', i 156 time.sleep(1) 157 return 'done'
158 159 #the two functions below deal with simplejson decoding as unicode, esp for the dict decode 160 #and subsequent usage as function Keyword arguments unicode variable names won't work! 161 #borrowed from http://stackoverflow.com/questions/956867/how-to-get-string-objects-instead-unicode-ones-from-json-in-python 162 163
164 -def _decode_list(lst):
165 newlist = [] 166 for i in lst: 167 if isinstance(i, unicode): 168 i = i.encode('utf-8') 169 elif isinstance(i, list): 170 i = _decode_list(i) 171 newlist.append(i) 172 return newlist
173 174
175 -def _decode_dict(dct):
176 newdict = {} 177 for k, v in dct.iteritems(): 178 if isinstance(k, unicode): 179 k = k.encode('utf-8') 180 if isinstance(v, unicode): 181 v = v.encode('utf-8') 182 elif isinstance(v, list): 183 v = _decode_list(v) 184 newdict[k] = v 185 return newdict
186 187
188 -def executor(queue, task, out):
189 """ the background process """ 190 logger.debug(' task started') 191 192 class LogOutput(object): 193 """Facility to log output at intervals""" 194 def __init__(self, out_queue): 195 self.out_queue = out_queue 196 self.stdout = sys.stdout 197 sys.stdout = self
198 199 def __del__(self): 200 sys.stdout = self.stdout 201 202 def flush(self): 203 pass 204 205 def write(self, data): 206 self.out_queue.put(data) 207 208 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid}) 209 stdout = LogOutput(out) 210 try: 211 if task.app: 212 os.chdir(os.environ['WEB2PY_PATH']) 213 from gluon.shell import env, parse_path_info 214 from gluon import current 215 level = logging.getLogger().getEffectiveLevel() 216 logging.getLogger().setLevel(logging.WARN) 217 # Get controller-specific subdirectory if task.app is of 218 # form 'app/controller' 219 (a, c, f) = parse_path_info(task.app) 220 _env = env(a=a, c=c, import_models=True) 221 logging.getLogger().setLevel(level) 222 f = task.function 223 functions = current._scheduler.tasks 224 if not functions: 225 #look into env 226 _function = _env.get(f) 227 else: 228 _function = functions.get(f) 229 if not isinstance(_function, CALLABLETYPES): 230 raise NameError( 231 "name '%s' not found in scheduler's environment" % f) 232 #Inject W2P_TASK into environment 233 _env.update({'W2P_TASK' : W2P_TASK}) 234 globals().update(_env) 235 args = loads(task.args) 236 vars = loads(task.vars, object_hook=_decode_dict) 237 result = dumps(_function(*args, **vars)) 238 else: 239 ### for testing purpose only 240 result = eval(task.function)( 241 *loads(task.args, object_hook=_decode_dict), 242 **loads(task.vars, object_hook=_decode_dict)) 243 queue.put(TaskReport(COMPLETED, result=result)) 244 except BaseException, e: 245 tb = traceback.format_exc() 246 queue.put(TaskReport(FAILED, tb=tb)) 247 del stdout 248 249
250 -class MetaScheduler(threading.Thread):
251 - def __init__(self):
252 threading.Thread.__init__(self) 253 self.process = None # the background process 254 self.have_heartbeat = True # set to False to kill 255 self.empty_runs = 0
256
257 - def async(self, task):
258 """ 259 starts the background process and returns: 260 ('ok',result,output) 261 ('error',exception,None) 262 ('timeout',None,None) 263 ('terminated',None,None) 264 """ 265 db = self.db 266 sr = db.scheduler_run 267 out = multiprocessing.Queue() 268 queue = multiprocessing.Queue(maxsize=1) 269 p = multiprocessing.Process(target=executor, args=(queue, task, out)) 270 self.process = p 271 logger.debug(' task starting') 272 p.start() 273 274 task_output = "" 275 tout = "" 276 277 try: 278 if task.sync_output > 0: 279 run_timeout = task.sync_output 280 else: 281 run_timeout = task.timeout 282 283 start = time.time() 284 285 while p.is_alive() and ( 286 not task.timeout or time.time() - start < task.timeout): 287 if tout: 288 try: 289 logger.debug(' partial output saved') 290 db(sr.id == task.run_id).update(run_output=task_output) 291 db.commit() 292 except: 293 pass 294 p.join(timeout=run_timeout) 295 tout = "" 296 while not out.empty(): 297 tout += out.get() 298 if tout: 299 logger.debug(' partial output: "%s"' % str(tout)) 300 if CLEAROUT in tout: 301 task_output = tout[ 302 tout.rfind(CLEAROUT) + len(CLEAROUT):] 303 else: 304 task_output += tout 305 except: 306 p.terminate() 307 p.join() 308 self.have_heartbeat = False 309 logger.debug(' task stopped by general exception') 310 tr = TaskReport(STOPPED) 311 else: 312 if p.is_alive(): 313 p.terminate() 314 logger.debug(' task timeout') 315 try: 316 # we try to get a traceback here 317 tr = queue.get(timeout=2) 318 tr.status = TIMEOUT 319 tr.output = task_output 320 except Queue.Empty: 321 tr = TaskReport(TIMEOUT) 322 elif queue.empty(): 323 self.have_heartbeat = False 324 logger.debug(' task stopped') 325 tr = TaskReport(STOPPED) 326 else: 327 logger.debug(' task completed or failed') 328 tr = queue.get() 329 tr.output = task_output 330 return tr
331
332 - def die(self):
333 logger.info('die!') 334 self.have_heartbeat = False 335 self.terminate_process()
336
337 - def give_up(self):
338 logger.info('Giving up as soon as possible!') 339 self.have_heartbeat = False
340
341 - def terminate_process(self):
342 try: 343 self.process.terminate() 344 except: 345 pass # no process to terminate
346
347 - def run(self):
348 """ the thread that sends heartbeat """ 349 counter = 0 350 while self.have_heartbeat: 351 self.send_heartbeat(counter) 352 counter += 1
353
354 - def start_heartbeats(self):
355 self.start()
356
357 - def send_heartbeat(self, counter):
358 print 'thum' 359 time.sleep(1)
360
361 - def pop_task(self):
362 return Task( 363 app=None, 364 function='demo_function', 365 timeout=7, 366 args='[2]', 367 vars='{}')
368
369 - def report_task(self, task, task_report):
370 print 'reporting task' 371 pass
372
373 - def sleep(self):
374 pass
375
376 - def loop(self):
377 try: 378 self.start_heartbeats() 379 while True and self.have_heartbeat: 380 logger.debug('looping...') 381 task = self.pop_task() 382 if task: 383 self.empty_runs = 0 384 self.report_task(task, self.async(task)) 385 else: 386 self.empty_runs += 1 387 logger.debug('sleeping...') 388 if self.max_empty_runs != 0: 389 logger.debug('empty runs %s/%s', 390 self.empty_runs, self.max_empty_runs) 391 if self.empty_runs >= self.max_empty_runs: 392 logger.info( 393 'empty runs limit reached, killing myself') 394 self.die() 395 self.sleep() 396 except KeyboardInterrupt: 397 self.die()
398 399 400 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED) 401 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED) 402 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK) 403 404
405 -class TYPE(object):
406 """ 407 validator that check whether field is valid json and validate its type 408 """ 409
410 - def __init__(self, myclass=list, parse=False):
411 self.myclass = myclass 412 self.parse = parse
413
414 - def __call__(self, value):
415 from gluon import current 416 try: 417 obj = loads(value) 418 except: 419 return (value, current.T('invalid json')) 420 else: 421 if isinstance(obj, self.myclass): 422 if self.parse: 423 return (obj, None) 424 else: 425 return (value, None) 426 else: 427 return (value, current.T('Not of type: %s') % self.myclass)
428 429
430 -class Scheduler(MetaScheduler):
431 - def __init__(self, db, tasks=None, migrate=True, 432 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT, 433 max_empty_runs=0, discard_results=False, utc_time=False):
434 435 MetaScheduler.__init__(self) 436 437 self.db = db 438 self.db_thread = None 439 self.tasks = tasks 440 self.group_names = group_names 441 self.heartbeat = heartbeat 442 self.worker_name = worker_name or IDENTIFIER 443 #list containing status as recorded in the table plus a boost parameter 444 #for hibernation (i.e. when someone stop the worker acting on the worker table) 445 self.worker_status = [RUNNING, 1] 446 self.max_empty_runs = max_empty_runs 447 self.discard_results = discard_results 448 self.is_a_ticker = False 449 self.do_assign_tasks = False 450 self.greedy = False 451 self.utc_time = utc_time 452 453 from gluon import current 454 current._scheduler = self 455 456 self.define_tables(db, migrate=migrate)
457
458 - def now(self):
459 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
460
461 - def set_requirements(self, scheduler_task):
462 from gluon import current 463 if hasattr(current, 'request'): 464 scheduler_task.application_name.default = '%s/%s' % ( 465 current.request.application, current.request.controller 466 )
467
468 - def define_tables(self, db, migrate):
469 from gluon.dal import DEFAULT 470 logger.debug('defining tables (migrate=%s)', migrate) 471 now = self.now 472 db.define_table( 473 'scheduler_task', 474 Field('application_name', requires=IS_NOT_EMPTY(), 475 default=None, writable=False), 476 Field('task_name', default=None), 477 Field('group_name', default='main'), 478 Field('status', requires=IS_IN_SET(TASK_STATUS), 479 default=QUEUED, writable=False), 480 Field('function_name', 481 requires=IS_IN_SET(sorted(self.tasks.keys())) 482 if self.tasks else DEFAULT), 483 Field('uuid', requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'), 484 unique=True, default=web2py_uuid), 485 Field('args', 'text', default='[]', requires=TYPE(list)), 486 Field('vars', 'text', default='{}', requires=TYPE(dict)), 487 Field('enabled', 'boolean', default=True), 488 Field('start_time', 'datetime', default=now, 489 requires=IS_DATETIME()), 490 Field('next_run_time', 'datetime', default=now), 491 Field('stop_time', 'datetime'), 492 Field('repeats', 'integer', default=1, comment="0=unlimited", 493 requires=IS_INT_IN_RANGE(0, None)), 494 Field('retry_failed', 'integer', default=0, comment="-1=unlimited", 495 requires=IS_INT_IN_RANGE(-1, None)), 496 Field('period', 'integer', default=60, comment='seconds', 497 requires=IS_INT_IN_RANGE(0, None)), 498 Field('timeout', 'integer', default=60, comment='seconds', 499 requires=IS_INT_IN_RANGE(0, None)), 500 Field('sync_output', 'integer', default=0, 501 comment="update output every n sec: 0=never", 502 requires=IS_INT_IN_RANGE(0, None)), 503 Field('times_run', 'integer', default=0, writable=False), 504 Field('times_failed', 'integer', default=0, writable=False), 505 Field('last_run_time', 'datetime', writable=False, readable=False), 506 Field('assigned_worker_name', default='', writable=False), 507 on_define=self.set_requirements, 508 migrate=migrate, format='%(task_name)s') 509 510 db.define_table( 511 'scheduler_run', 512 Field('task_id', 'reference scheduler_task'), 513 Field('status', requires=IS_IN_SET(RUN_STATUS)), 514 Field('start_time', 'datetime'), 515 Field('stop_time', 'datetime'), 516 Field('run_output', 'text'), 517 Field('run_result', 'text'), 518 Field('traceback', 'text'), 519 Field('worker_name', default=self.worker_name), 520 migrate=migrate) 521 522 db.define_table( 523 'scheduler_worker', 524 Field('worker_name', unique=True), 525 Field('first_heartbeat', 'datetime'), 526 Field('last_heartbeat', 'datetime'), 527 Field('status', requires=IS_IN_SET(WORKER_STATUS)), 528 Field('is_ticker', 'boolean', default=False, writable=False), 529 Field('group_names', 'list:string', default=self.group_names),#FIXME writable=False or give the chance to update dinamically the groups? 530 migrate=migrate) 531 if migrate: 532 db.commit()
533
534 - def loop(self, worker_name=None):
535 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 536 try: 537 self.start_heartbeats() 538 while True and self.have_heartbeat: 539 if self.worker_status[0] == DISABLED: 540 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1]) 541 self.sleep() 542 continue 543 logger.debug('looping...') 544 task = self.wrapped_pop_task() 545 if task: 546 self.empty_runs = 0 547 self.worker_status[0] = RUNNING 548 self.report_task(task, self.async(task)) 549 self.worker_status[0] = ACTIVE 550 else: 551 self.empty_runs += 1 552 logger.debug('sleeping...') 553 if self.max_empty_runs != 0: 554 logger.debug('empty runs %s/%s', 555 self.empty_runs, self.max_empty_runs) 556 if self.empty_runs >= self.max_empty_runs: 557 logger.info( 558 'empty runs limit reached, killing myself') 559 self.die() 560 self.sleep() 561 except (KeyboardInterrupt, SystemExit): 562 logger.info('catched') 563 self.die()
564
565 - def wrapped_assign_tasks(self, db):
566 logger.debug('Assigning tasks...') 567 db.commit() #db.commit() only for Mysql 568 x = 0 569 while x < 10: 570 try: 571 self.assign_tasks(db) 572 db.commit() 573 logger.debug('Tasks assigned...') 574 break 575 except: 576 db.rollback() 577 logger.error('TICKER: error assigning tasks (%s)', x) 578 x += 1 579 time.sleep(0.5)
580
581 - def wrapped_pop_task(self):
582 db = self.db 583 db.commit() #another nifty db.commit() only for Mysql 584 x = 0 585 while x < 10: 586 try: 587 rtn = self.pop_task(db) 588 return rtn 589 break 590 except: 591 db.rollback() 592 logger.error(' error popping tasks') 593 x += 1 594 time.sleep(0.5)
595
596 - def pop_task(self, db):
597 now = self.now() 598 st = self.db.scheduler_task 599 if self.is_a_ticker and self.do_assign_tasks: 600 #I'm a ticker, and 5 loops passed without reassigning tasks, let's do 601 #that and loop again 602 self.wrapped_assign_tasks(db) 603 return None 604 #ready to process something 605 grabbed = db(st.assigned_worker_name == self.worker_name)( 606 st.status == ASSIGNED) 607 608 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first() 609 if task: 610 task.update_record(status=RUNNING, last_run_time=now) 611 #noone will touch my task! 612 db.commit() 613 logger.debug(' work to do %s', task.id) 614 else: 615 if self.greedy and self.is_a_ticker: 616 #there are other tasks ready to be assigned 617 logger.info('TICKER: greedy loop') 618 self.wrapped_assign_tasks(db) 619 else: 620 logger.info('nothing to do') 621 return None 622 next_run_time = task.last_run_time + datetime.timedelta( 623 seconds=task.period) 624 times_run = task.times_run + 1 625 if times_run < task.repeats or task.repeats == 0: 626 #need to run (repeating task) 627 run_again = True 628 else: 629 #no need to run again 630 run_again = False 631 run_id = 0 632 while True and not self.discard_results: 633 logger.debug(' new scheduler_run record') 634 try: 635 run_id = db.scheduler_run.insert( 636 task_id=task.id, 637 status=RUNNING, 638 start_time=now, 639 worker_name=self.worker_name) 640 db.commit() 641 break 642 except: 643 time.sleep(0.5) 644 db.rollback() 645 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task) 646 return Task( 647 app=task.application_name, 648 function=task.function_name, 649 timeout=task.timeout, 650 args=task.args, # in json 651 vars=task.vars, # in json 652 task_id=task.id, 653 run_id=run_id, 654 run_again=run_again, 655 next_run_time=next_run_time, 656 times_run=times_run, 657 stop_time=task.stop_time, 658 retry_failed=task.retry_failed, 659 times_failed=task.times_failed, 660 sync_output=task.sync_output, 661 uuid=task.uuid)
662
663 - def report_task(self, task, task_report):
664 db = self.db 665 now = self.now() 666 while True: 667 try: 668 if not self.discard_results: 669 if task_report.result != 'null' or task_report.tb: 670 #result is 'null' as a string if task completed 671 #if it's stopped it's None as NoneType, so we record 672 #the STOPPED "run" anyway 673 logger.debug(' recording task report in db (%s)', 674 task_report.status) 675 db(db.scheduler_run.id == task.run_id).update( 676 status=task_report.status, 677 stop_time=now, 678 run_result=task_report.result, 679 run_output=task_report.output, 680 traceback=task_report.tb) 681 else: 682 logger.debug(' deleting task report in db because of no result') 683 db(db.scheduler_run.id == task.run_id).delete() 684 #if there is a stop_time and the following run would exceed it 685 is_expired = (task.stop_time 686 and task.next_run_time > task.stop_time 687 and True or False) 688 status = (task.run_again and is_expired and EXPIRED 689 or task.run_again and not is_expired 690 and QUEUED or COMPLETED) 691 if task_report.status == COMPLETED: 692 d = dict(status=status, 693 next_run_time=task.next_run_time, 694 times_run=task.times_run, 695 times_failed=0 696 ) 697 db(db.scheduler_task.id == task.task_id)( 698 db.scheduler_task.status == RUNNING).update(**d) 699 else: 700 st_mapping = {'FAILED': 'FAILED', 701 'TIMEOUT': 'TIMEOUT', 702 'STOPPED': 'QUEUED'}[task_report.status] 703 status = (task.retry_failed 704 and task.times_failed < task.retry_failed 705 and QUEUED or task.retry_failed == -1 706 and QUEUED or st_mapping) 707 db( 708 (db.scheduler_task.id == task.task_id) & 709 (db.scheduler_task.status == RUNNING) 710 ).update( 711 times_failed=db.scheduler_task.times_failed + 1, 712 next_run_time=task.next_run_time, 713 status=status 714 ) 715 db.commit() 716 logger.info('task completed (%s)', task_report.status) 717 break 718 except: 719 db.rollback() 720 time.sleep(0.5)
721
722 - def adj_hibernation(self):
723 if self.worker_status[0] == DISABLED: 724 wk_st = self.worker_status[1] 725 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION 726 self.worker_status[1] = hibernation
727
728 - def send_heartbeat(self, counter):
729 if not self.db_thread: 730 logger.debug('thread building own DAL object') 731 self.db_thread = DAL( 732 self.db._uri, folder=self.db._adapter.folder) 733 self.define_tables(self.db_thread, migrate=False) 734 try: 735 db = self.db_thread 736 sw, st = db.scheduler_worker, db.scheduler_task 737 now = self.now() 738 # record heartbeat 739 mybackedstatus = db(sw.worker_name == self.worker_name).select().first() 740 if not mybackedstatus: 741 sw.insert(status=ACTIVE, worker_name=self.worker_name, 742 first_heartbeat=now, last_heartbeat=now, 743 group_names=self.group_names) 744 self.worker_status = [ACTIVE, 1] # activating the process 745 mybackedstatus = ACTIVE 746 else: 747 mybackedstatus = mybackedstatus.status 748 if mybackedstatus == DISABLED: 749 # keep sleeping 750 self.worker_status[0] = DISABLED 751 if self.worker_status[1] == MAXHIBERNATION: 752 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 753 db(sw.worker_name == self.worker_name).update( 754 last_heartbeat=now) 755 elif mybackedstatus == TERMINATE: 756 self.worker_status[0] = TERMINATE 757 logger.debug("Waiting to terminate the current task") 758 self.give_up() 759 return 760 elif mybackedstatus == KILL: 761 self.worker_status[0] = KILL 762 self.die() 763 else: 764 if mybackedstatus == STOP_TASK: 765 logger.info('Asked to kill the current task') 766 self.terminate_process() 767 logger.debug('........recording heartbeat (%s)', self.worker_status[0]) 768 db(sw.worker_name == self.worker_name).update( 769 last_heartbeat=now, status=ACTIVE) 770 self.worker_status[1] = 1 # re-activating the process 771 if self.worker_status[0] != RUNNING: 772 self.worker_status[0] = ACTIVE 773 774 self.do_assign_tasks = False 775 if counter % 5 == 0 or mybackedstatus == PICK: 776 try: 777 # delete inactive workers 778 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3) 779 departure = now - datetime.timedelta( 780 seconds=self.heartbeat * 3 * MAXHIBERNATION) 781 logger.debug( 782 ' freeing workers that have not sent heartbeat') 783 inactive_workers = db( 784 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) | 785 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE)) 786 ) 787 db(st.assigned_worker_name.belongs( 788 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\ 789 .update(assigned_worker_name='', status=QUEUED) 790 inactive_workers.delete() 791 try: 792 self.is_a_ticker = self.being_a_ticker() 793 except: 794 logger.error('Error coordinating TICKER') 795 if self.worker_status[0] == ACTIVE: 796 self.do_assign_tasks = True 797 except: 798 logger.error('Error cleaning up') 799 db.commit() 800 except: 801 logger.error('Error retrieving status') 802 db.rollback() 803 self.adj_hibernation() 804 self.sleep()
805
806 - def being_a_ticker(self):
807 db = self.db_thread 808 sw = db.scheduler_worker 809 all_active = db( 810 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE) 811 ).select() 812 ticker = all_active.find(lambda row: row.is_ticker is True).first() 813 not_busy = self.worker_status[0] == ACTIVE 814 if not ticker: 815 #if no other tickers are around 816 if not_busy: 817 #only if I'm not busy 818 db(sw.worker_name == self.worker_name).update(is_ticker=True) 819 db(sw.worker_name != self.worker_name).update(is_ticker=False) 820 logger.info("TICKER: I'm a ticker") 821 else: 822 #I'm busy 823 if len(all_active) >= 1: 824 #so I'll "downgrade" myself to a "poor worker" 825 db(sw.worker_name == self.worker_name).update(is_ticker=False) 826 else: 827 not_busy = True 828 db.commit() 829 return not_busy 830 else: 831 logger.info( 832 "%s is a ticker, I'm a poor worker" % ticker.worker_name) 833 return False
834
835 - def assign_tasks(self, db):
836 sw, st = db.scheduler_worker, db.scheduler_task 837 now = self.now() 838 all_workers = db(sw.status == ACTIVE).select() 839 #build workers as dict of groups 840 wkgroups = {} 841 for w in all_workers: 842 group_names = w.group_names 843 for gname in group_names: 844 if gname not in wkgroups: 845 wkgroups[gname] = dict( 846 workers=[{'name': w.worker_name, 'c': 0}]) 847 else: 848 wkgroups[gname]['workers'].append( 849 {'name': w.worker_name, 'c': 0}) 850 #set queued tasks that expired between "runs" (i.e., you turned off 851 #the scheduler): then it wasn't expired, but now it is 852 db(st.status.belongs( 853 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED) 854 855 all_available = db( 856 (st.status.belongs((QUEUED, ASSIGNED))) & 857 ((st.times_run < st.repeats) | (st.repeats == 0)) & 858 (st.start_time <= now) & 859 ((st.stop_time == None) | (st.stop_time > now)) & 860 (st.next_run_time <= now) & 861 (st.enabled == True) 862 ) 863 limit = len(all_workers) * (50 / (len(wkgroups) or 1)) 864 #if there are a moltitude of tasks, let's figure out a maximum of tasks per worker. 865 #this can be adjusted with some added intelligence (like esteeming how many tasks will 866 #a worker complete before the ticker reassign them around, but the gain is quite small 867 #50 is quite a sweet spot also for fast tasks, with sane heartbeat values 868 #NB: ticker reassign tasks every 5 cycles, so if a worker completes his 50 tasks in less 869 #than heartbeat*5 seconds, it won't pick new tasks until heartbeat*5 seconds pass. 870 871 #If a worker is currently elaborating a long task, all other tasks assigned 872 #to him needs to be reassigned "freely" to other workers, that may be free. 873 #this shuffles up things a bit, in order to maintain the idea of a semi-linear scalability 874 875 #let's freeze it up 876 db.commit() 877 x = 0 878 for group in wkgroups.keys(): 879 tasks = all_available(st.group_name == group).select( 880 limitby=(0, limit), orderby = st.next_run_time) 881 #let's break up the queue evenly among workers 882 for task in tasks: 883 x += 1 884 gname = task.group_name 885 ws = wkgroups.get(gname) 886 if ws: 887 counter = 0 888 myw = 0 889 for i, w in enumerate(ws['workers']): 890 if w['c'] < counter: 891 myw = i 892 counter = w['c'] 893 d = dict( 894 status=ASSIGNED, 895 assigned_worker_name=wkgroups[gname]['workers'][myw]['name'] 896 ) 897 if not task.task_name: 898 d['task_name'] = task.function_name 899 task.update_record(**d) 900 wkgroups[gname]['workers'][myw]['c'] += 1 901 902 db.commit() 903 #I didn't report tasks but I'm working nonetheless!!!! 904 if x > 0: 905 self.empty_runs = 0 906 #I'll be greedy only if tasks assigned are equal to the limit 907 # (meaning there could be others ready to be assigned) 908 self.greedy = x >= limit and True or False 909 logger.info('TICKER: workers are %s', len(all_workers)) 910 logger.info('TICKER: tasks are %s', x)
911
912 - def sleep(self):
913 time.sleep(self.heartbeat * self.worker_status[1])
914 # should only sleep until next available task 915
916 - def set_worker_status(self, group_names=None, action=ACTIVE):
917 if not group_names: 918 group_names = self.group_names 919 elif isinstance(group_names, str): 920 group_names = [group_names] 921 for group in group_names: 922 self.db( 923 self.db.scheduler_worker.group_names.contains(group) 924 ).update(status=action)
925
926 - def disable(self, group_names=None):
927 self.set_worker_status(group_names=group_names,action=DISABLED)
928
929 - def resume(self, group_names=None):
930 self.set_worker_status(group_names=group_names,action=ACTIVE)
931
932 - def terminate(self, group_names=None):
933 self.set_worker_status(group_names=group_names,action=TERMINATE)
934
935 - def kill(self, group_names=None):
936 self.set_worker_status(group_names=group_names,action=KILL)
937
938 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
939 """ 940 Queue tasks. This takes care of handling the validation of all 941 values. 942 :param function: the function (anything callable with a __name__) 943 :param pargs: "raw" args to be passed to the function. Automatically 944 jsonified. 945 :param pvars: "raw" kwargs to be passed to the function. Automatically 946 jsonified 947 :param kwargs: all the scheduler_task columns. args and vars here should be 948 in json format already, they will override pargs and pvars 949 950 returns a dict just as a normal validate_and_insert, plus a uuid key holding 951 the uuid of the queued task. If validation is not passed, both id and uuid 952 will be None, and you'll get an "error" dict holding the errors found. 953 """ 954 if hasattr(function, '__name__'): 955 function = function.__name__ 956 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs) 957 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars) 958 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid() 959 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function 960 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None 961 rtn = self.db.scheduler_task.validate_and_insert( 962 function_name=function, 963 task_name=tname, 964 args=targs, 965 vars=tvars, 966 uuid=tuuid, 967 **kwargs) 968 if not rtn.errors: 969 rtn.uuid = tuuid 970 if immediate: 971 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK) 972 else: 973 rtn.uuid = None 974 return rtn
975
976 - def task_status(self, ref, output=False):
977 """ 978 Shortcut for task status retrieval 979 980 :param ref: can be 981 - integer --> lookup will be done by scheduler_task.id 982 - string --> lookup will be done by scheduler_task.uuid 983 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1') 984 :param output: fetch also the scheduler_run record 985 986 Returns a single Row object, for the last queued task 987 If output == True, returns also the last scheduler_run record 988 scheduler_run record is fetched by a left join, so it can 989 have all fields == None 990 991 """ 992 from gluon.dal import Query 993 sr, st = self.db.scheduler_run, self.db.scheduler_task 994 if isinstance(ref, int): 995 q = st.id == ref 996 elif isinstance(ref, str): 997 q = st.uuid == ref 998 elif isinstance(ref, Query): 999 q = ref 1000 else: 1001 raise SyntaxError( 1002 "You can retrieve results only by id, uuid or Query") 1003 fields = [st.ALL] 1004 left = False 1005 orderby = ~st.id 1006 if output: 1007 fields = st.ALL, sr.ALL 1008 left = sr.on(sr.task_id == st.id) 1009 orderby = ~st.id | ~sr.id 1010 row = self.db(q).select( 1011 *fields, 1012 **dict(orderby=orderby, 1013 left=left, 1014 limitby=(0, 1)) 1015 ).first() 1016 if row and output: 1017 row.result = row.scheduler_run.run_result and \ 1018 loads(row.scheduler_run.run_result, 1019 object_hook=_decode_dict) or None 1020 return row
1021
1022 - def stop_task(self, ref):
1023 """ 1024 Experimental!!! 1025 Shortcut for task termination. 1026 If the task is RUNNING it will terminate it --> execution will be set as FAILED 1027 If the task is QUEUED, its stop_time will be set as to "now", 1028 the enabled flag will be set to False, status to STOPPED 1029 1030 :param ref: can be 1031 - integer --> lookup will be done by scheduler_task.id 1032 - string --> lookup will be done by scheduler_task.uuid 1033 Returns: 1034 - 1 if task was stopped (meaning an update has been done) 1035 - None if task was not found, or if task was not RUNNING or QUEUED 1036 """ 1037 from gluon.dal import Query 1038 st, sw = self.db.scheduler_task, self.db.scheduler_worker 1039 if isinstance(ref, int): 1040 q = st.id == ref 1041 elif isinstance(ref, str): 1042 q = st.uuid == ref 1043 else: 1044 raise SyntaxError( 1045 "You can retrieve results only by id or uuid") 1046 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first() 1047 rtn = None 1048 if not task: 1049 return rtn 1050 if task.status == 'RUNNING': 1051 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK) 1052 elif task.status == 'QUEUED': 1053 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED) 1054 return rtn
1055 1056
1057 -def main():
1058 """ 1059 allows to run worker without python web2py.py .... by simply python this.py 1060 """ 1061 parser = optparse.OptionParser() 1062 parser.add_option( 1063 "-w", "--worker_name", dest="worker_name", default=None, 1064 help="start a worker with name") 1065 parser.add_option( 1066 "-b", "--heartbeat", dest="heartbeat", default=10, 1067 type='int', help="heartbeat time in seconds (default 10)") 1068 parser.add_option( 1069 "-L", "--logger_level", dest="logger_level", 1070 default=30, 1071 type='int', 1072 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)") 1073 parser.add_option("-E", "--empty-runs", 1074 dest="max_empty_runs", 1075 type='int', 1076 default=0, 1077 help="max loops with no grabbed tasks permitted (0 for never check)") 1078 parser.add_option( 1079 "-g", "--group_names", dest="group_names", 1080 default='main', 1081 help="comma separated list of groups to be picked by the worker") 1082 parser.add_option( 1083 "-f", "--db_folder", dest="db_folder", 1084 default='/Users/mdipierro/web2py/applications/scheduler/databases', 1085 help="location of the dal database folder") 1086 parser.add_option( 1087 "-u", "--db_uri", dest="db_uri", 1088 default='sqlite://storage.sqlite', 1089 help="database URI string (web2py DAL syntax)") 1090 parser.add_option( 1091 "-t", "--tasks", dest="tasks", default=None, 1092 help="file containing task files, must define" + 1093 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks") 1094 parser.add_option( 1095 "-U", "--utc-time", dest="utc_time", default=False, 1096 help="work with UTC timestamps" 1097 ) 1098 (options, args) = parser.parse_args() 1099 if not options.tasks or not options.db_uri: 1100 print USAGE 1101 if options.tasks: 1102 path, filename = os.path.split(options.tasks) 1103 if filename.endswith('.py'): 1104 filename = filename[:-3] 1105 sys.path.append(path) 1106 print 'importing tasks...' 1107 tasks = __import__(filename, globals(), locals(), [], -1).tasks 1108 print 'tasks found: ' + ', '.join(tasks.keys()) 1109 else: 1110 tasks = {} 1111 group_names = [x.strip() for x in options.group_names.split(',')] 1112 1113 logging.getLogger().setLevel(options.logger_level) 1114 1115 print 'groups for this worker: ' + ', '.join(group_names) 1116 print 'connecting to database in folder: ' + options.db_folder or './' 1117 print 'using URI: ' + options.db_uri 1118 db = DAL(options.db_uri, folder=options.db_folder) 1119 print 'instantiating scheduler...' 1120 scheduler = Scheduler(db=db, 1121 worker_name=options.worker_name, 1122 tasks=tasks, 1123 migrate=True, 1124 group_names=group_names, 1125 heartbeat=options.heartbeat, 1126 max_empty_runs=options.max_empty_runs, 1127 utc_time=options.utc_time) 1128 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1)) 1129 print 'starting main worker loop...' 1130 scheduler.loop()
1131 1132 if __name__ == '__main__': 1133 main() 1134