Skip to content

Commit 1fdad21

Browse files
committedDec 28, 2014
master/client: queue pubsub
1 parent f033810 commit 1fdad21

File tree

6 files changed

+135
-93
lines changed

6 files changed

+135
-93
lines changed
 

‎artiq/management/pc_rpc.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import traceback
1818

1919
from artiq.management import pyon
20-
from artiq.management.network import AsyncioServer
20+
from artiq.management.tools import AsyncioServer
2121

2222

2323
class RemoteError(Exception):

‎artiq/management/scheduler.py

+8-20
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
11
import asyncio
22
from time import time
33

4+
from artiq.management.sync_struct import Notifier
45
from artiq.management.worker import Worker
56

67

78
class Scheduler:
89
def __init__(self, *args, **kwargs):
910
self.worker = Worker(*args, **kwargs)
1011
self.next_rid = 0
11-
self.currently_executing = None
12-
self.queued = []
12+
self.queued = Notifier([])
1313
self.queue_count = asyncio.Semaphore(0)
1414
self.periodic = dict()
1515
self.periodic_modified = asyncio.Event()
@@ -47,14 +47,6 @@ def cancel_once(self, rid):
4747
if qrid == rid)
4848
del self.queued[idx]
4949

50-
def get_schedule(self):
51-
if self.currently_executing is None:
52-
ce = None
53-
else:
54-
rid, run_params, timeout, t = self.currently_executing
55-
ce = rid, run_params, timeout, time() - t
56-
return ce, self.queued, self.periodic
57-
5850
def run_periodic(self, run_params, timeout, period):
5951
prid = self.new_prid()
6052
self.periodic[prid] = 0, run_params, timeout, period
@@ -64,13 +56,6 @@ def run_periodic(self, run_params, timeout, period):
6456
def cancel_periodic(self, prid):
6557
del self.periodic[prid]
6658

67-
@asyncio.coroutine
68-
def _run(self, rid, run_params, timeout):
69-
self.currently_executing = rid, run_params, timeout, time()
70-
result = yield from self.worker.run(run_params, timeout)
71-
self.currently_executing = None
72-
return result
73-
7459
@asyncio.coroutine
7560
def _run_periodic(self):
7661
while True:
@@ -93,8 +78,10 @@ def _run_periodic(self):
9378
self.periodic[min_prid] = now + period, run_params, timeout, period
9479

9580
rid = self.new_rid()
96-
result = yield from self._run(rid, run_params, timeout)
81+
self.queued.insert(0, (rid, run_params, timeout))
82+
result = yield from self.worker.run(run_params, timeout)
9783
print(prid, rid, result)
84+
del self.queued[0]
9885

9986
@asyncio.coroutine
10087
def _schedule(self):
@@ -111,6 +98,7 @@ def _schedule(self):
11198

11299
yield from self._run_periodic()
113100
if ev_queue in done:
114-
rid, run_params, timeout = self.queued.pop(0)
115-
result = yield from self._run(rid, run_params, timeout)
101+
rid, run_params, timeout = self.queued.backing_struct[0]
102+
result = yield from self.worker.run(run_params, timeout)
116103
print(rid, result)
104+
del self.queued[0]

‎artiq/management/sync_struct.py

+61-46
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,15 @@
11
import asyncio
22

33
from artiq.management import pyon
4-
from artiq.management.network import AsyncioServer
4+
from artiq.management.tools import AsyncioServer
55

66

77
_init_string = b"ARTIQ sync_struct\n"
88

99

1010
class Subscriber:
11-
def __init__(self, target_builder, error_cb, notify_cb=None):
11+
def __init__(self, target_builder, notify_cb=None):
1212
self.target_builder = target_builder
13-
self.error_cb = error_cb
1413
self.notify_cb = notify_cb
1514

1615
@asyncio.coroutine
@@ -19,7 +18,7 @@ def connect(self, host, port):
1918
yield from asyncio.open_connection(host, port)
2019
try:
2120
self._writer.write(_init_string)
22-
self._receive_task = asyncio.Task(self._receive_cr())
21+
self.receive_task = asyncio.Task(self._receive_cr())
2322
except:
2423
self._writer.close()
2524
del self._reader
@@ -29,9 +28,9 @@ def connect(self, host, port):
2928
@asyncio.coroutine
3029
def close(self):
3130
try:
32-
self._receive_task.cancel()
31+
self.receive_task.cancel()
3332
try:
34-
yield from asyncio.wait_for(self._receive_task, None)
33+
yield from asyncio.wait_for(self.receive_task, None)
3534
except asyncio.CancelledError:
3635
pass
3736
finally:
@@ -41,42 +40,74 @@ def close(self):
4140

4241
@asyncio.coroutine
4342
def _receive_cr(self):
44-
try:
45-
target = None
46-
while True:
47-
line = yield from self._reader.readline()
48-
obj = pyon.decode(line.decode())
49-
action = obj["action"]
50-
51-
if action == "init":
52-
target = self.target_builder(obj["struct"])
53-
elif action == "append":
54-
target.append(obj["x"])
55-
elif action == "pop":
56-
target.pop(obj["i"])
57-
elif action == "delitem":
58-
target.__delitem__(obj["key"])
59-
if self.notify_cb is not None:
60-
self.notify_cb()
61-
except:
62-
self.error_cb()
63-
raise
43+
target = None
44+
while True:
45+
line = yield from self._reader.readline()
46+
if not line:
47+
return
48+
obj = pyon.decode(line.decode())
49+
action = obj["action"]
50+
51+
if action == "init":
52+
target = self.target_builder(obj["struct"])
53+
elif action == "append":
54+
target.append(obj["x"])
55+
elif action == "insert":
56+
target.insert(obj["i"], obj["x"])
57+
elif action == "pop":
58+
target.pop(obj["i"])
59+
elif action == "delitem":
60+
target.__delitem__(obj["key"])
61+
if self.notify_cb is not None:
62+
self.notify_cb()
63+
64+
65+
class Notifier:
66+
def __init__(self, backing_struct):
67+
self.backing_struct = backing_struct
68+
self.publisher = None
69+
70+
# Backing struct modification methods.
71+
# All modifications must go through them!
72+
73+
def append(self, x):
74+
self.backing_struct.append(x)
75+
if self.publisher is not None:
76+
self.publisher.publish({"action": "append", "x": x})
77+
78+
def insert(self, i, x):
79+
self.backing_struct.insert(i, x)
80+
if self.publisher is not None:
81+
self.publisher.publish({"action": "insert", "i": i, "x": x})
82+
83+
def pop(self, i=-1):
84+
r = self.backing_struct.pop(i)
85+
if self.publisher is not None:
86+
self.publisher.publish({"action": "pop", "i": i})
87+
return r
88+
89+
def __delitem__(self, key):
90+
self.backing_struct.__delitem__(key)
91+
if self.publisher is not None:
92+
self.publisher.publish({"action": "delitem", "key": key})
6493

6594

6695
class Publisher(AsyncioServer):
67-
def __init__(self, backing_struct):
96+
def __init__(self, notifier):
6897
AsyncioServer.__init__(self)
69-
self.backing_struct = backing_struct
98+
self.notifier = notifier
7099
self._recipients = set()
71100

101+
self.notifier.publisher = self
102+
72103
@asyncio.coroutine
73104
def _handle_connection_cr(self, reader, writer):
74105
try:
75106
line = yield from reader.readline()
76107
if line != _init_string:
77108
return
78109

79-
obj = {"action": "init", "struct": self.backing_struct}
110+
obj = {"action": "init", "struct": self.notifier.backing_struct}
80111
line = pyon.encode(obj) + "\n"
81112
writer.write(line.encode())
82113

@@ -96,24 +127,8 @@ def _handle_connection_cr(self, reader, writer):
96127
finally:
97128
writer.close()
98129

99-
def _publish(self, obj):
130+
def publish(self, obj):
100131
line = pyon.encode(obj) + "\n"
101132
line = line.encode()
102133
for recipient in self._recipients:
103134
recipient.put_nowait(line)
104-
105-
# Backing struct modification methods.
106-
# All modifications must go through them!
107-
108-
def append(self, x):
109-
self.backing_struct.append(x)
110-
self._publish({"action": "append", "x": x})
111-
112-
def pop(self, i=-1):
113-
r = self.backing_struct.pop(i)
114-
self._publish({"action": "pop", "i": i})
115-
return r
116-
117-
def __delitem__(self, key):
118-
self.backing_struct.__delitem__(key)
119-
self._publish({"action": "delitem", "key": key})

‎artiq/management/network.py ‎artiq/management/tools.py

+5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,12 @@
11
import asyncio
2+
import sys
23
from copy import copy
34

45

6+
def clear_screen():
7+
sys.stdout.write("\x1b[2J\x1b[H")
8+
9+
510
class AsyncioServer:
611
"""Generic TCP server based on asyncio.
712

‎frontend/artiq_client.py

+43-20
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,14 @@
22

33
import argparse
44
import time
5+
import sys
6+
import asyncio
57

68
from prettytable import PrettyTable
79

810
from artiq.management.pc_rpc import Client
11+
from artiq.management.sync_struct import Subscriber
12+
from artiq.management.tools import clear_screen
913

1014

1115
def _get_args():
@@ -14,7 +18,7 @@ def _get_args():
1418
"-s", "--server", default="::1",
1519
help="hostname or IP of the master to connect to")
1620
parser.add_argument(
17-
"--port", default=8888, type=int,
21+
"--port", default=None, type=int,
1822
help="TCP port to use to connect to the master")
1923

2024
subparsers = parser.add_subparsers(dest="action")
@@ -42,8 +46,8 @@ def _get_args():
4246
parser_cancel.add_argument("rid", type=int,
4347
help="run identifier (RID/PRID)")
4448

45-
parser_show = subparsers.add_parser("show",
46-
help="show the experiment schedule")
49+
parser_show = subparsers.add_parser("show-queue",
50+
help="show the experiment queue")
4751

4852
return parser.parse_args()
4953

@@ -70,30 +74,25 @@ def _action_cancel(remote, args):
7074
remote.cancel_once(args.rid)
7175

7276

73-
def _action_show(remote, args):
74-
ce, queue, periodic = remote.get_schedule()
75-
if ce is not None or queue:
77+
def _show_queue(queue):
78+
clear_screen()
79+
if queue:
7680
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
77-
if ce is not None:
78-
rid, run_params, timeout, t = ce
79-
print("Currently executing RID {} for {:.1f}s".format(rid, t))
80-
row = [rid, run_params["file"]]
81-
for x in run_params["unit"], run_params["function"], timeout:
82-
row.append("-" if x is None else x)
83-
table.add_row(row)
8481
for rid, run_params, timeout in queue:
8582
row = [rid, run_params["file"]]
8683
for x in run_params["unit"], run_params["function"], timeout:
8784
row.append("-" if x is None else x)
8885
table.add_row(row)
89-
print("Run queue:")
9086
print(table)
9187
else:
9288
print("Queue is empty")
89+
90+
91+
def _show_periodic(periodic):
92+
clear_screen()
9393
if periodic:
9494
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
9595
"Timeout", "Period"])
96-
print("Periodic schedule:")
9796
sp = sorted(periodic.items(), key=lambda x: x[1][0])
9897
for prid, (next_run, run_params, timeout, period) in sp:
9998
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
@@ -107,13 +106,37 @@ def _action_show(remote, args):
107106
print("No periodic schedule")
108107

109108

110-
def main():
111-
args = _get_args()
112-
remote = Client(args.server, args.port, "master")
109+
def _run_subscriber(host, port, subscriber):
110+
loop = asyncio.get_event_loop()
113111
try:
114-
globals()["_action_" + args.action](remote, args)
112+
loop.run_until_complete(subscriber.connect(host, port))
113+
try:
114+
loop.run_until_complete(asyncio.wait_for(subscriber.receive_task,
115+
None))
116+
print("Connection to master lost")
117+
finally:
118+
loop.run_until_complete(subscriber.close())
115119
finally:
116-
remote.close_rpc()
120+
loop.close()
121+
122+
123+
def main():
124+
args = _get_args()
125+
if args.action == "show-queue":
126+
queue = []
127+
def init_queue(x):
128+
queue[:] = x
129+
return queue
130+
subscriber = Subscriber(init_queue, lambda: _show_queue(queue))
131+
port = 8887 if args.port is None else args.port
132+
_run_subscriber(args.server, port, subscriber)
133+
else:
134+
port = 8888 if args.port is None else args.port
135+
remote = Client(args.server, port, "schedule_control")
136+
try:
137+
globals()["_action_" + args.action](remote, args)
138+
finally:
139+
remote.close_rpc()
117140

118141
if __name__ == "__main__":
119142
main()

‎frontend/artiq_master.py

+17-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import argparse
55

66
from artiq.management.pc_rpc import Server
7+
from artiq.management.sync_struct import Publisher
78
from artiq.management.scheduler import Scheduler
89

910

@@ -13,8 +14,11 @@ def _get_args():
1314
"--bind", default="::1",
1415
help="hostname or IP address to bind to")
1516
parser.add_argument(
16-
"--port", default=8888, type=int,
17-
help="TCP port to listen to")
17+
"--port-schedule-control", default=8888, type=int,
18+
help="TCP port to listen to for schedule control")
19+
parser.add_argument(
20+
"--port-schedule-notify", default=8887, type=int,
21+
help="TCP port to listen to for schedule notifications")
1822
return parser.parse_args()
1923

2024

@@ -25,12 +29,19 @@ def main():
2529
scheduler = Scheduler("ddb.pyon", "pdb.pyon")
2630
loop.run_until_complete(scheduler.start())
2731
try:
28-
server = Server(scheduler, "master")
29-
loop.run_until_complete(server.start(args.bind, args.port))
32+
schedule_control = Server(scheduler, "schedule_control")
33+
loop.run_until_complete(schedule_control.start(
34+
args.bind, args.port_schedule_control))
3035
try:
31-
loop.run_forever()
36+
schedule_notify = Publisher(scheduler.queued)
37+
loop.run_until_complete(schedule_notify.start(
38+
args.bind, args.port_schedule_notify))
39+
try:
40+
loop.run_forever()
41+
finally:
42+
loop.run_until_complete(schedule_notify.stop())
3243
finally:
33-
loop.run_until_complete(server.stop())
44+
loop.run_until_complete(schedule_control.stop())
3445
finally:
3546
loop.run_until_complete(scheduler.stop())
3647
finally:

0 commit comments

Comments
 (0)
Please sign in to comment.