1
2
3
4 USAGE = """
5 ## Example
6
7 For any existing app
8
9 Create File: app/models/scheduler.py ======
10 from gluon.scheduler import Scheduler
11
12 def demo1(*args,**vars):
13 print 'you passed args=%s and vars=%s' % (args, vars)
14 return 'done!'
15
16 def demo2():
17 1/0
18
19 scheduler = Scheduler(db,dict(demo1=demo1,demo2=demo2))
20 ## run worker nodes with:
21
22 cd web2py
23 python web2py.py -K myapp
24 or
25 python gluon/scheduler.py -u sqlite://storage.sqlite \
26 -f applications/myapp/databases/ \
27 -t mytasks.py
28 (-h for info)
29 python scheduler.py -h
30
31 ## schedule jobs using
32 http://127.0.0.1:8000/myapp/appadmin/insert/db/scheduler_task
33
34 ## monitor scheduled jobs
35 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_task.id>0
36
37 ## view completed jobs
38 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_run.id>0
39
40 ## view workers
41 http://127.0.0.1:8000/myapp/appadmin/select/db?query=db.scheduler_worker.id>0
42
43 ## To install the scheduler as a permanent daemon on Linux (w/ Upstart), put
44 ## the following into /etc/init/web2py-scheduler.conf:
45 ## (This assumes your web2py instance is installed in <user>'s home directory,
46 ## running as <user>, with app <myapp>, on network interface eth0.)
47
48 description "web2py task scheduler"
49 start on (local-filesystems and net-device-up IFACE=eth0)
50 stop on shutdown
51 respawn limit 8 60 # Give up if restart occurs 8 times in 60 seconds.
52 exec sudo -u <user> python /home/<user>/web2py/web2py.py -K <myapp>
53 respawn
54
55 ## You can then start/stop/restart/check status of the daemon with:
56 sudo start web2py-scheduler
57 sudo stop web2py-scheduler
58 sudo restart web2py-scheduler
59 sudo status web2py-scheduler
60 """
61
62 import os
63 import time
64 import multiprocessing
65 import sys
66 import threading
67 import traceback
68 import signal
69 import socket
70 import datetime
71 import logging
72 import optparse
73 import types
74 import Queue
75
76 if 'WEB2PY_PATH' in os.environ:
77 sys.path.append(os.environ['WEB2PY_PATH'])
78 else:
79 os.environ['WEB2PY_PATH'] = os.getcwd()
80
81 if not os.environ['WEB2PY_PATH'] in sys.path:
82 sys.path.append(os.environ['WEB2PY_PATH'])
83
84 try:
85 from gluon.contrib.simplejson import loads, dumps
86 except:
87 from simplejson import loads, dumps
88
89 IDENTIFIER = "%s#%s" % (socket.gethostname(),os.getpid())
90
91 logger = logging.getLogger('web2py.scheduler.%s' % IDENTIFIER)
92
93 from gluon import DAL, Field, IS_NOT_EMPTY, IS_IN_SET, IS_NOT_IN_DB, IS_INT_IN_RANGE, IS_DATETIME
94 from gluon.utils import web2py_uuid
95 from gluon.storage import Storage
96
97
98 QUEUED = 'QUEUED'
99 ASSIGNED = 'ASSIGNED'
100 RUNNING = 'RUNNING'
101 COMPLETED = 'COMPLETED'
102 FAILED = 'FAILED'
103 TIMEOUT = 'TIMEOUT'
104 STOPPED = 'STOPPED'
105 ACTIVE = 'ACTIVE'
106 TERMINATE = 'TERMINATE'
107 DISABLED = 'DISABLED'
108 KILL = 'KILL'
109 PICK = 'PICK'
110 STOP_TASK = 'STOP_TASK'
111 EXPIRED = 'EXPIRED'
112 SECONDS = 1
113 HEARTBEAT = 3 * SECONDS
114 MAXHIBERNATION = 10
115 CLEAROUT = '!clear!'
116
117 CALLABLETYPES = (types.LambdaType, types.FunctionType,
118 types.BuiltinFunctionType,
119 types.MethodType, types.BuiltinMethodType)
120
121
123 - def __init__(self, app, function, timeout, args='[]', vars='{}', **kwargs):
124 logger.debug(' new task allocated: %s.%s', app, function)
125 self.app = app
126 self.function = function
127 self.timeout = timeout
128 self.args = args
129 self.vars = vars
130 self.__dict__.update(kwargs)
131
133 return '<Task: %s>' % self.function
134
135
137 - def __init__(self, status, result=None, output=None, tb=None):
138 logger.debug(' new task report: %s', status)
139 if tb:
140 logger.debug(' traceback: %s', tb)
141 else:
142 logger.debug(' result: %s', result)
143 self.status = status
144 self.result = result
145 self.output = output
146 self.tb = tb
147
149 return '<TaskReport: %s>' % self.status
150
151
153 """ test function """
154 for i in range(argv[0]):
155 print 'click', i
156 time.sleep(1)
157 return 'done'
158
159
160
161
162
163
165 newlist = []
166 for i in lst:
167 if isinstance(i, unicode):
168 i = i.encode('utf-8')
169 elif isinstance(i, list):
170 i = _decode_list(i)
171 newlist.append(i)
172 return newlist
173
174
176 newdict = {}
177 for k, v in dct.iteritems():
178 if isinstance(k, unicode):
179 k = k.encode('utf-8')
180 if isinstance(v, unicode):
181 v = v.encode('utf-8')
182 elif isinstance(v, list):
183 v = _decode_list(v)
184 newdict[k] = v
185 return newdict
186
187
189 """ the background process """
190 logger.debug(' task started')
191
192 class LogOutput(object):
193 """Facility to log output at intervals"""
194 def __init__(self, out_queue):
195 self.out_queue = out_queue
196 self.stdout = sys.stdout
197 sys.stdout = self
198
199 def __del__(self):
200 sys.stdout = self.stdout
201
202 def flush(self):
203 pass
204
205 def write(self, data):
206 self.out_queue.put(data)
207
208 W2P_TASK = Storage({'id' : task.task_id, 'uuid' : task.uuid})
209 stdout = LogOutput(out)
210 try:
211 if task.app:
212 os.chdir(os.environ['WEB2PY_PATH'])
213 from gluon.shell import env, parse_path_info
214 from gluon import current
215 level = logging.getLogger().getEffectiveLevel()
216 logging.getLogger().setLevel(logging.WARN)
217
218
219 (a, c, f) = parse_path_info(task.app)
220 _env = env(a=a, c=c, import_models=True)
221 logging.getLogger().setLevel(level)
222 f = task.function
223 functions = current._scheduler.tasks
224 if not functions:
225
226 _function = _env.get(f)
227 else:
228 _function = functions.get(f)
229 if not isinstance(_function, CALLABLETYPES):
230 raise NameError(
231 "name '%s' not found in scheduler's environment" % f)
232
233 _env.update({'W2P_TASK' : W2P_TASK})
234 globals().update(_env)
235 args = loads(task.args)
236 vars = loads(task.vars, object_hook=_decode_dict)
237 result = dumps(_function(*args, **vars))
238 else:
239
240 result = eval(task.function)(
241 *loads(task.args, object_hook=_decode_dict),
242 **loads(task.vars, object_hook=_decode_dict))
243 queue.put(TaskReport(COMPLETED, result=result))
244 except BaseException, e:
245 tb = traceback.format_exc()
246 queue.put(TaskReport(FAILED, tb=tb))
247 del stdout
248
249
398
399
400 TASK_STATUS = (QUEUED, RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED, EXPIRED)
401 RUN_STATUS = (RUNNING, COMPLETED, FAILED, TIMEOUT, STOPPED)
402 WORKER_STATUS = (ACTIVE, PICK, DISABLED, TERMINATE, KILL, STOP_TASK)
403
404
406 """
407 validator that check whether field is valid json and validate its type
408 """
409
410 - def __init__(self, myclass=list, parse=False):
413
415 from gluon import current
416 try:
417 obj = loads(value)
418 except:
419 return (value, current.T('invalid json'))
420 else:
421 if isinstance(obj, self.myclass):
422 if self.parse:
423 return (obj, None)
424 else:
425 return (value, None)
426 else:
427 return (value, current.T('Not of type: %s') % self.myclass)
428
429
431 - def __init__(self, db, tasks=None, migrate=True,
432 worker_name=None, group_names=['main'], heartbeat=HEARTBEAT,
433 max_empty_runs=0, discard_results=False, utc_time=False):
434
435 MetaScheduler.__init__(self)
436
437 self.db = db
438 self.db_thread = None
439 self.tasks = tasks
440 self.group_names = group_names
441 self.heartbeat = heartbeat
442 self.worker_name = worker_name or IDENTIFIER
443
444
445 self.worker_status = [RUNNING, 1]
446 self.max_empty_runs = max_empty_runs
447 self.discard_results = discard_results
448 self.is_a_ticker = False
449 self.do_assign_tasks = False
450 self.greedy = False
451 self.utc_time = utc_time
452
453 from gluon import current
454 current._scheduler = self
455
456 self.define_tables(db, migrate=migrate)
457
459 return self.utc_time and datetime.datetime.utcnow() or datetime.datetime.now()
460
467
469 from gluon.dal import DEFAULT
470 logger.debug('defining tables (migrate=%s)', migrate)
471 now = self.now
472 db.define_table(
473 'scheduler_task',
474 Field('application_name', requires=IS_NOT_EMPTY(),
475 default=None, writable=False),
476 Field('task_name', default=None),
477 Field('group_name', default='main'),
478 Field('status', requires=IS_IN_SET(TASK_STATUS),
479 default=QUEUED, writable=False),
480 Field('function_name',
481 requires=IS_IN_SET(sorted(self.tasks.keys()))
482 if self.tasks else DEFAULT),
483 Field('uuid', requires=IS_NOT_IN_DB(db, 'scheduler_task.uuid'),
484 unique=True, default=web2py_uuid),
485 Field('args', 'text', default='[]', requires=TYPE(list)),
486 Field('vars', 'text', default='{}', requires=TYPE(dict)),
487 Field('enabled', 'boolean', default=True),
488 Field('start_time', 'datetime', default=now,
489 requires=IS_DATETIME()),
490 Field('next_run_time', 'datetime', default=now),
491 Field('stop_time', 'datetime'),
492 Field('repeats', 'integer', default=1, comment="0=unlimited",
493 requires=IS_INT_IN_RANGE(0, None)),
494 Field('retry_failed', 'integer', default=0, comment="-1=unlimited",
495 requires=IS_INT_IN_RANGE(-1, None)),
496 Field('period', 'integer', default=60, comment='seconds',
497 requires=IS_INT_IN_RANGE(0, None)),
498 Field('timeout', 'integer', default=60, comment='seconds',
499 requires=IS_INT_IN_RANGE(0, None)),
500 Field('sync_output', 'integer', default=0,
501 comment="update output every n sec: 0=never",
502 requires=IS_INT_IN_RANGE(0, None)),
503 Field('times_run', 'integer', default=0, writable=False),
504 Field('times_failed', 'integer', default=0, writable=False),
505 Field('last_run_time', 'datetime', writable=False, readable=False),
506 Field('assigned_worker_name', default='', writable=False),
507 on_define=self.set_requirements,
508 migrate=migrate, format='%(task_name)s')
509
510 db.define_table(
511 'scheduler_run',
512 Field('task_id', 'reference scheduler_task'),
513 Field('status', requires=IS_IN_SET(RUN_STATUS)),
514 Field('start_time', 'datetime'),
515 Field('stop_time', 'datetime'),
516 Field('run_output', 'text'),
517 Field('run_result', 'text'),
518 Field('traceback', 'text'),
519 Field('worker_name', default=self.worker_name),
520 migrate=migrate)
521
522 db.define_table(
523 'scheduler_worker',
524 Field('worker_name', unique=True),
525 Field('first_heartbeat', 'datetime'),
526 Field('last_heartbeat', 'datetime'),
527 Field('status', requires=IS_IN_SET(WORKER_STATUS)),
528 Field('is_ticker', 'boolean', default=False, writable=False),
529 Field('group_names', 'list:string', default=self.group_names),
530 migrate=migrate)
531 if migrate:
532 db.commit()
533
534 - def loop(self, worker_name=None):
535 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
536 try:
537 self.start_heartbeats()
538 while True and self.have_heartbeat:
539 if self.worker_status[0] == DISABLED:
540 logger.debug('Someone stopped me, sleeping until better times come (%s)', self.worker_status[1])
541 self.sleep()
542 continue
543 logger.debug('looping...')
544 task = self.wrapped_pop_task()
545 if task:
546 self.empty_runs = 0
547 self.worker_status[0] = RUNNING
548 self.report_task(task, self.async(task))
549 self.worker_status[0] = ACTIVE
550 else:
551 self.empty_runs += 1
552 logger.debug('sleeping...')
553 if self.max_empty_runs != 0:
554 logger.debug('empty runs %s/%s',
555 self.empty_runs, self.max_empty_runs)
556 if self.empty_runs >= self.max_empty_runs:
557 logger.info(
558 'empty runs limit reached, killing myself')
559 self.die()
560 self.sleep()
561 except (KeyboardInterrupt, SystemExit):
562 logger.info('catched')
563 self.die()
564
580
595
597 now = self.now()
598 st = self.db.scheduler_task
599 if self.is_a_ticker and self.do_assign_tasks:
600
601
602 self.wrapped_assign_tasks(db)
603 return None
604
605 grabbed = db(st.assigned_worker_name == self.worker_name)(
606 st.status == ASSIGNED)
607
608 task = grabbed.select(limitby=(0, 1), orderby=st.next_run_time).first()
609 if task:
610 task.update_record(status=RUNNING, last_run_time=now)
611
612 db.commit()
613 logger.debug(' work to do %s', task.id)
614 else:
615 if self.greedy and self.is_a_ticker:
616
617 logger.info('TICKER: greedy loop')
618 self.wrapped_assign_tasks(db)
619 else:
620 logger.info('nothing to do')
621 return None
622 next_run_time = task.last_run_time + datetime.timedelta(
623 seconds=task.period)
624 times_run = task.times_run + 1
625 if times_run < task.repeats or task.repeats == 0:
626
627 run_again = True
628 else:
629
630 run_again = False
631 run_id = 0
632 while True and not self.discard_results:
633 logger.debug(' new scheduler_run record')
634 try:
635 run_id = db.scheduler_run.insert(
636 task_id=task.id,
637 status=RUNNING,
638 start_time=now,
639 worker_name=self.worker_name)
640 db.commit()
641 break
642 except:
643 time.sleep(0.5)
644 db.rollback()
645 logger.info('new task %(id)s "%(task_name)s" %(application_name)s.%(function_name)s' % task)
646 return Task(
647 app=task.application_name,
648 function=task.function_name,
649 timeout=task.timeout,
650 args=task.args,
651 vars=task.vars,
652 task_id=task.id,
653 run_id=run_id,
654 run_again=run_again,
655 next_run_time=next_run_time,
656 times_run=times_run,
657 stop_time=task.stop_time,
658 retry_failed=task.retry_failed,
659 times_failed=task.times_failed,
660 sync_output=task.sync_output,
661 uuid=task.uuid)
662
664 db = self.db
665 now = self.now()
666 while True:
667 try:
668 if not self.discard_results:
669 if task_report.result != 'null' or task_report.tb:
670
671
672
673 logger.debug(' recording task report in db (%s)',
674 task_report.status)
675 db(db.scheduler_run.id == task.run_id).update(
676 status=task_report.status,
677 stop_time=now,
678 run_result=task_report.result,
679 run_output=task_report.output,
680 traceback=task_report.tb)
681 else:
682 logger.debug(' deleting task report in db because of no result')
683 db(db.scheduler_run.id == task.run_id).delete()
684
685 is_expired = (task.stop_time
686 and task.next_run_time > task.stop_time
687 and True or False)
688 status = (task.run_again and is_expired and EXPIRED
689 or task.run_again and not is_expired
690 and QUEUED or COMPLETED)
691 if task_report.status == COMPLETED:
692 d = dict(status=status,
693 next_run_time=task.next_run_time,
694 times_run=task.times_run,
695 times_failed=0
696 )
697 db(db.scheduler_task.id == task.task_id)(
698 db.scheduler_task.status == RUNNING).update(**d)
699 else:
700 st_mapping = {'FAILED': 'FAILED',
701 'TIMEOUT': 'TIMEOUT',
702 'STOPPED': 'QUEUED'}[task_report.status]
703 status = (task.retry_failed
704 and task.times_failed < task.retry_failed
705 and QUEUED or task.retry_failed == -1
706 and QUEUED or st_mapping)
707 db(
708 (db.scheduler_task.id == task.task_id) &
709 (db.scheduler_task.status == RUNNING)
710 ).update(
711 times_failed=db.scheduler_task.times_failed + 1,
712 next_run_time=task.next_run_time,
713 status=status
714 )
715 db.commit()
716 logger.info('task completed (%s)', task_report.status)
717 break
718 except:
719 db.rollback()
720 time.sleep(0.5)
721
723 if self.worker_status[0] == DISABLED:
724 wk_st = self.worker_status[1]
725 hibernation = wk_st + 1 if wk_st < MAXHIBERNATION else MAXHIBERNATION
726 self.worker_status[1] = hibernation
727
729 if not self.db_thread:
730 logger.debug('thread building own DAL object')
731 self.db_thread = DAL(
732 self.db._uri, folder=self.db._adapter.folder)
733 self.define_tables(self.db_thread, migrate=False)
734 try:
735 db = self.db_thread
736 sw, st = db.scheduler_worker, db.scheduler_task
737 now = self.now()
738
739 mybackedstatus = db(sw.worker_name == self.worker_name).select().first()
740 if not mybackedstatus:
741 sw.insert(status=ACTIVE, worker_name=self.worker_name,
742 first_heartbeat=now, last_heartbeat=now,
743 group_names=self.group_names)
744 self.worker_status = [ACTIVE, 1]
745 mybackedstatus = ACTIVE
746 else:
747 mybackedstatus = mybackedstatus.status
748 if mybackedstatus == DISABLED:
749
750 self.worker_status[0] = DISABLED
751 if self.worker_status[1] == MAXHIBERNATION:
752 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
753 db(sw.worker_name == self.worker_name).update(
754 last_heartbeat=now)
755 elif mybackedstatus == TERMINATE:
756 self.worker_status[0] = TERMINATE
757 logger.debug("Waiting to terminate the current task")
758 self.give_up()
759 return
760 elif mybackedstatus == KILL:
761 self.worker_status[0] = KILL
762 self.die()
763 else:
764 if mybackedstatus == STOP_TASK:
765 logger.info('Asked to kill the current task')
766 self.terminate_process()
767 logger.debug('........recording heartbeat (%s)', self.worker_status[0])
768 db(sw.worker_name == self.worker_name).update(
769 last_heartbeat=now, status=ACTIVE)
770 self.worker_status[1] = 1
771 if self.worker_status[0] != RUNNING:
772 self.worker_status[0] = ACTIVE
773
774 self.do_assign_tasks = False
775 if counter % 5 == 0 or mybackedstatus == PICK:
776 try:
777
778 expiration = now - datetime.timedelta(seconds=self.heartbeat * 3)
779 departure = now - datetime.timedelta(
780 seconds=self.heartbeat * 3 * MAXHIBERNATION)
781 logger.debug(
782 ' freeing workers that have not sent heartbeat')
783 inactive_workers = db(
784 ((sw.last_heartbeat < expiration) & (sw.status == ACTIVE)) |
785 ((sw.last_heartbeat < departure) & (sw.status != ACTIVE))
786 )
787 db(st.assigned_worker_name.belongs(
788 inactive_workers._select(sw.worker_name)))(st.status == RUNNING)\
789 .update(assigned_worker_name='', status=QUEUED)
790 inactive_workers.delete()
791 try:
792 self.is_a_ticker = self.being_a_ticker()
793 except:
794 logger.error('Error coordinating TICKER')
795 if self.worker_status[0] == ACTIVE:
796 self.do_assign_tasks = True
797 except:
798 logger.error('Error cleaning up')
799 db.commit()
800 except:
801 logger.error('Error retrieving status')
802 db.rollback()
803 self.adj_hibernation()
804 self.sleep()
805
807 db = self.db_thread
808 sw = db.scheduler_worker
809 all_active = db(
810 (sw.worker_name != self.worker_name) & (sw.status == ACTIVE)
811 ).select()
812 ticker = all_active.find(lambda row: row.is_ticker is True).first()
813 not_busy = self.worker_status[0] == ACTIVE
814 if not ticker:
815
816 if not_busy:
817
818 db(sw.worker_name == self.worker_name).update(is_ticker=True)
819 db(sw.worker_name != self.worker_name).update(is_ticker=False)
820 logger.info("TICKER: I'm a ticker")
821 else:
822
823 if len(all_active) >= 1:
824
825 db(sw.worker_name == self.worker_name).update(is_ticker=False)
826 else:
827 not_busy = True
828 db.commit()
829 return not_busy
830 else:
831 logger.info(
832 "%s is a ticker, I'm a poor worker" % ticker.worker_name)
833 return False
834
836 sw, st = db.scheduler_worker, db.scheduler_task
837 now = self.now()
838 all_workers = db(sw.status == ACTIVE).select()
839
840 wkgroups = {}
841 for w in all_workers:
842 group_names = w.group_names
843 for gname in group_names:
844 if gname not in wkgroups:
845 wkgroups[gname] = dict(
846 workers=[{'name': w.worker_name, 'c': 0}])
847 else:
848 wkgroups[gname]['workers'].append(
849 {'name': w.worker_name, 'c': 0})
850
851
852 db(st.status.belongs(
853 (QUEUED, ASSIGNED)))(st.stop_time < now).update(status=EXPIRED)
854
855 all_available = db(
856 (st.status.belongs((QUEUED, ASSIGNED))) &
857 ((st.times_run < st.repeats) | (st.repeats == 0)) &
858 (st.start_time <= now) &
859 ((st.stop_time == None) | (st.stop_time > now)) &
860 (st.next_run_time <= now) &
861 (st.enabled == True)
862 )
863 limit = len(all_workers) * (50 / (len(wkgroups) or 1))
864
865
866
867
868
869
870
871
872
873
874
875
876 db.commit()
877 x = 0
878 for group in wkgroups.keys():
879 tasks = all_available(st.group_name == group).select(
880 limitby=(0, limit), orderby = st.next_run_time)
881
882 for task in tasks:
883 x += 1
884 gname = task.group_name
885 ws = wkgroups.get(gname)
886 if ws:
887 counter = 0
888 myw = 0
889 for i, w in enumerate(ws['workers']):
890 if w['c'] < counter:
891 myw = i
892 counter = w['c']
893 d = dict(
894 status=ASSIGNED,
895 assigned_worker_name=wkgroups[gname]['workers'][myw]['name']
896 )
897 if not task.task_name:
898 d['task_name'] = task.function_name
899 task.update_record(**d)
900 wkgroups[gname]['workers'][myw]['c'] += 1
901
902 db.commit()
903
904 if x > 0:
905 self.empty_runs = 0
906
907
908 self.greedy = x >= limit and True or False
909 logger.info('TICKER: workers are %s', len(all_workers))
910 logger.info('TICKER: tasks are %s', x)
911
913 time.sleep(self.heartbeat * self.worker_status[1])
914
915
917 if not group_names:
918 group_names = self.group_names
919 elif isinstance(group_names, str):
920 group_names = [group_names]
921 for group in group_names:
922 self.db(
923 self.db.scheduler_worker.group_names.contains(group)
924 ).update(status=action)
925
926 - def disable(self, group_names=None):
928
929 - def resume(self, group_names=None):
931
934
935 - def kill(self, group_names=None):
937
938 - def queue_task(self, function, pargs=[], pvars={}, **kwargs):
939 """
940 Queue tasks. This takes care of handling the validation of all
941 values.
942 :param function: the function (anything callable with a __name__)
943 :param pargs: "raw" args to be passed to the function. Automatically
944 jsonified.
945 :param pvars: "raw" kwargs to be passed to the function. Automatically
946 jsonified
947 :param kwargs: all the scheduler_task columns. args and vars here should be
948 in json format already, they will override pargs and pvars
949
950 returns a dict just as a normal validate_and_insert, plus a uuid key holding
951 the uuid of the queued task. If validation is not passed, both id and uuid
952 will be None, and you'll get an "error" dict holding the errors found.
953 """
954 if hasattr(function, '__name__'):
955 function = function.__name__
956 targs = 'args' in kwargs and kwargs.pop('args') or dumps(pargs)
957 tvars = 'vars' in kwargs and kwargs.pop('vars') or dumps(pvars)
958 tuuid = 'uuid' in kwargs and kwargs.pop('uuid') or web2py_uuid()
959 tname = 'task_name' in kwargs and kwargs.pop('task_name') or function
960 immediate = 'immediate' in kwargs and kwargs.pop('immediate') or None
961 rtn = self.db.scheduler_task.validate_and_insert(
962 function_name=function,
963 task_name=tname,
964 args=targs,
965 vars=tvars,
966 uuid=tuuid,
967 **kwargs)
968 if not rtn.errors:
969 rtn.uuid = tuuid
970 if immediate:
971 self.db(self.db.scheduler_worker.is_ticker == True).update(status=PICK)
972 else:
973 rtn.uuid = None
974 return rtn
975
977 """
978 Shortcut for task status retrieval
979
980 :param ref: can be
981 - integer --> lookup will be done by scheduler_task.id
982 - string --> lookup will be done by scheduler_task.uuid
983 - query --> lookup as you wish (as in db.scheduler_task.task_name == 'test1')
984 :param output: fetch also the scheduler_run record
985
986 Returns a single Row object, for the last queued task
987 If output == True, returns also the last scheduler_run record
988 scheduler_run record is fetched by a left join, so it can
989 have all fields == None
990
991 """
992 from gluon.dal import Query
993 sr, st = self.db.scheduler_run, self.db.scheduler_task
994 if isinstance(ref, int):
995 q = st.id == ref
996 elif isinstance(ref, str):
997 q = st.uuid == ref
998 elif isinstance(ref, Query):
999 q = ref
1000 else:
1001 raise SyntaxError(
1002 "You can retrieve results only by id, uuid or Query")
1003 fields = [st.ALL]
1004 left = False
1005 orderby = ~st.id
1006 if output:
1007 fields = st.ALL, sr.ALL
1008 left = sr.on(sr.task_id == st.id)
1009 orderby = ~st.id | ~sr.id
1010 row = self.db(q).select(
1011 *fields,
1012 **dict(orderby=orderby,
1013 left=left,
1014 limitby=(0, 1))
1015 ).first()
1016 if row and output:
1017 row.result = row.scheduler_run.run_result and \
1018 loads(row.scheduler_run.run_result,
1019 object_hook=_decode_dict) or None
1020 return row
1021
1023 """
1024 Experimental!!!
1025 Shortcut for task termination.
1026 If the task is RUNNING it will terminate it --> execution will be set as FAILED
1027 If the task is QUEUED, its stop_time will be set as to "now",
1028 the enabled flag will be set to False, status to STOPPED
1029
1030 :param ref: can be
1031 - integer --> lookup will be done by scheduler_task.id
1032 - string --> lookup will be done by scheduler_task.uuid
1033 Returns:
1034 - 1 if task was stopped (meaning an update has been done)
1035 - None if task was not found, or if task was not RUNNING or QUEUED
1036 """
1037 from gluon.dal import Query
1038 st, sw = self.db.scheduler_task, self.db.scheduler_worker
1039 if isinstance(ref, int):
1040 q = st.id == ref
1041 elif isinstance(ref, str):
1042 q = st.uuid == ref
1043 else:
1044 raise SyntaxError(
1045 "You can retrieve results only by id or uuid")
1046 task = self.db(q).select(st.id, st.status, st.assigned_worker_name).first()
1047 rtn = None
1048 if not task:
1049 return rtn
1050 if task.status == 'RUNNING':
1051 rtn = self.db(sw.worker_name == task.assigned_worker_name).update(status=STOP_TASK)
1052 elif task.status == 'QUEUED':
1053 rtn = self.db(q).update(stop_time=self.now(), enabled=False, status=STOPPED)
1054 return rtn
1055
1056
1058 """
1059 allows to run worker without python web2py.py .... by simply python this.py
1060 """
1061 parser = optparse.OptionParser()
1062 parser.add_option(
1063 "-w", "--worker_name", dest="worker_name", default=None,
1064 help="start a worker with name")
1065 parser.add_option(
1066 "-b", "--heartbeat", dest="heartbeat", default=10,
1067 type='int', help="heartbeat time in seconds (default 10)")
1068 parser.add_option(
1069 "-L", "--logger_level", dest="logger_level",
1070 default=30,
1071 type='int',
1072 help="set debug output level (0-100, 0 means all, 100 means none;default is 30)")
1073 parser.add_option("-E", "--empty-runs",
1074 dest="max_empty_runs",
1075 type='int',
1076 default=0,
1077 help="max loops with no grabbed tasks permitted (0 for never check)")
1078 parser.add_option(
1079 "-g", "--group_names", dest="group_names",
1080 default='main',
1081 help="comma separated list of groups to be picked by the worker")
1082 parser.add_option(
1083 "-f", "--db_folder", dest="db_folder",
1084 default='/Users/mdipierro/web2py/applications/scheduler/databases',
1085 help="location of the dal database folder")
1086 parser.add_option(
1087 "-u", "--db_uri", dest="db_uri",
1088 default='sqlite://storage.sqlite',
1089 help="database URI string (web2py DAL syntax)")
1090 parser.add_option(
1091 "-t", "--tasks", dest="tasks", default=None,
1092 help="file containing task files, must define" +
1093 "tasks = {'task_name':(lambda: 'output')} or similar set of tasks")
1094 parser.add_option(
1095 "-U", "--utc-time", dest="utc_time", default=False,
1096 help="work with UTC timestamps"
1097 )
1098 (options, args) = parser.parse_args()
1099 if not options.tasks or not options.db_uri:
1100 print USAGE
1101 if options.tasks:
1102 path, filename = os.path.split(options.tasks)
1103 if filename.endswith('.py'):
1104 filename = filename[:-3]
1105 sys.path.append(path)
1106 print 'importing tasks...'
1107 tasks = __import__(filename, globals(), locals(), [], -1).tasks
1108 print 'tasks found: ' + ', '.join(tasks.keys())
1109 else:
1110 tasks = {}
1111 group_names = [x.strip() for x in options.group_names.split(',')]
1112
1113 logging.getLogger().setLevel(options.logger_level)
1114
1115 print 'groups for this worker: ' + ', '.join(group_names)
1116 print 'connecting to database in folder: ' + options.db_folder or './'
1117 print 'using URI: ' + options.db_uri
1118 db = DAL(options.db_uri, folder=options.db_folder)
1119 print 'instantiating scheduler...'
1120 scheduler = Scheduler(db=db,
1121 worker_name=options.worker_name,
1122 tasks=tasks,
1123 migrate=True,
1124 group_names=group_names,
1125 heartbeat=options.heartbeat,
1126 max_empty_runs=options.max_empty_runs,
1127 utc_time=options.utc_time)
1128 signal.signal(signal.SIGTERM, lambda signum, stack_frame: sys.exit(1))
1129 print 'starting main worker loop...'
1130 scheduler.loop()
1131
1132 if __name__ == '__main__':
1133 main()
1134