7
7
class Scheduler :
8
8
def __init__ (self , * args , ** kwargs ):
9
9
self .worker = Worker (* args , ** kwargs )
10
- self .currently_executing = None
11
10
self .next_rid = 0
11
+ self .currently_executing = None
12
12
self .queued = []
13
13
self .queue_count = asyncio .Semaphore (0 )
14
+ self .periodic = dict ()
15
+ self .periodic_modified = asyncio .Event ()
14
16
15
17
def new_rid (self ):
16
18
r = self .next_rid
17
19
self .next_rid += 1
18
20
return r
19
21
22
+ def new_prid (self ):
23
+ prids = set (range (len (self .periodic ) + 1 ))
24
+ prids -= set (self .periodic .keys ())
25
+ return next (iter (prids ))
26
+
20
27
@asyncio .coroutine
21
28
def start (self ):
22
29
self .task = asyncio .Task (self ._schedule ())
@@ -46,13 +53,16 @@ def get_schedule(self):
46
53
else :
47
54
rid , run_params , timeout , t = self .currently_executing
48
55
ce = rid , run_params , timeout , time () - t
49
- return ce , self .queued
56
+ return ce , self .queued , self . periodic
50
57
51
58
def run_periodic (self , run_params , timeout , period ):
52
- raise NotImplementedError
59
+ prid = self .new_prid ()
60
+ self .periodic [prid ] = 0 , run_params , timeout , period
61
+ self .periodic_modified .set ()
62
+ return prid
53
63
54
64
def cancel_periodic (self , prid ):
55
- raise NotImplementedError
65
+ del self . periodic [ prid ]
56
66
57
67
@asyncio .coroutine
58
68
def _run (self , rid , run_params , timeout ):
@@ -62,9 +72,45 @@ def _run(self, rid, run_params, timeout):
62
72
return result
63
73
64
74
@asyncio .coroutine
65
- def _schedule (self ):
75
+ def _run_periodic (self ):
66
76
while True :
67
- yield from self .queue_count .acquire ()
68
- rid , run_params , timeout = self .queued .pop (0 )
77
+ min_next_run = None
78
+ min_prid = None
79
+ for prid , params in self .periodic .items ():
80
+ if min_next_run is None or params [0 ] < min_next_run :
81
+ min_next_run = params [0 ]
82
+ min_prid = prid
83
+
84
+ now = time ()
85
+
86
+ if min_next_run is None :
87
+ return None
88
+ min_next_run -= now
89
+ if min_next_run > 0 :
90
+ return min_next_run
91
+
92
+ next_run , run_params , timeout , period = self .periodic [min_prid ]
93
+ self .periodic [min_prid ] = now + period , run_params , timeout , period
94
+
95
+ rid = self .new_rid ()
69
96
result = yield from self ._run (rid , run_params , timeout )
70
- print (rid , result )
97
+ print (prid , rid , result )
98
+
99
+ @asyncio .coroutine
100
+ def _schedule (self ):
101
+ next_periodic = yield from self ._run_periodic ()
102
+ while True :
103
+ ev_queue = asyncio .Task (self .queue_count .acquire ())
104
+ ev_periodic = asyncio .Task (self .periodic_modified .wait ())
105
+ done , pend = yield from asyncio .wait (
106
+ [ev_queue , ev_periodic ],
107
+ timeout = next_periodic ,
108
+ return_when = asyncio .FIRST_COMPLETED )
109
+ for t in pend :
110
+ t .cancel ()
111
+
112
+ next_periodic = yield from self ._run_periodic ()
113
+ if ev_queue in done :
114
+ rid , run_params , timeout = self .queued .pop (0 )
115
+ result = yield from self ._run (rid , run_params , timeout )
116
+ print (rid , result )
0 commit comments