Skip to content

Commit

Permalink
master/client: queue pubsub
Browse files Browse the repository at this point in the history
sbourdeauducq committed Dec 28, 2014
1 parent f033810 commit 1fdad21
Showing 6 changed files with 135 additions and 93 deletions.
2 changes: 1 addition & 1 deletion artiq/management/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -17,7 +17,7 @@
import traceback

from artiq.management import pyon
from artiq.management.network import AsyncioServer
from artiq.management.tools import AsyncioServer


class RemoteError(Exception):
28 changes: 8 additions & 20 deletions artiq/management/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import asyncio
from time import time

from artiq.management.sync_struct import Notifier
from artiq.management.worker import Worker


class Scheduler:
def __init__(self, *args, **kwargs):
self.worker = Worker(*args, **kwargs)
self.next_rid = 0
self.currently_executing = None
self.queued = []
self.queued = Notifier([])
self.queue_count = asyncio.Semaphore(0)
self.periodic = dict()
self.periodic_modified = asyncio.Event()
@@ -47,14 +47,6 @@ def cancel_once(self, rid):
if qrid == rid)
del self.queued[idx]

def get_schedule(self):
if self.currently_executing is None:
ce = None
else:
rid, run_params, timeout, t = self.currently_executing
ce = rid, run_params, timeout, time() - t
return ce, self.queued, self.periodic

def run_periodic(self, run_params, timeout, period):
prid = self.new_prid()
self.periodic[prid] = 0, run_params, timeout, period
@@ -64,13 +56,6 @@ def run_periodic(self, run_params, timeout, period):
def cancel_periodic(self, prid):
del self.periodic[prid]

@asyncio.coroutine
def _run(self, rid, run_params, timeout):
self.currently_executing = rid, run_params, timeout, time()
result = yield from self.worker.run(run_params, timeout)
self.currently_executing = None
return result

@asyncio.coroutine
def _run_periodic(self):
while True:
@@ -93,8 +78,10 @@ def _run_periodic(self):
self.periodic[min_prid] = now + period, run_params, timeout, period

rid = self.new_rid()
result = yield from self._run(rid, run_params, timeout)
self.queued.insert(0, (rid, run_params, timeout))
result = yield from self.worker.run(run_params, timeout)
print(prid, rid, result)
del self.queued[0]

@asyncio.coroutine
def _schedule(self):
@@ -111,6 +98,7 @@ def _schedule(self):

yield from self._run_periodic()
if ev_queue in done:
rid, run_params, timeout = self.queued.pop(0)
result = yield from self._run(rid, run_params, timeout)
rid, run_params, timeout = self.queued.backing_struct[0]
result = yield from self.worker.run(run_params, timeout)
print(rid, result)
del self.queued[0]
107 changes: 61 additions & 46 deletions artiq/management/sync_struct.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
import asyncio

from artiq.management import pyon
from artiq.management.network import AsyncioServer
from artiq.management.tools import AsyncioServer


_init_string = b"ARTIQ sync_struct\n"


class Subscriber:
def __init__(self, target_builder, error_cb, notify_cb=None):
def __init__(self, target_builder, notify_cb=None):
self.target_builder = target_builder
self.error_cb = error_cb
self.notify_cb = notify_cb

@asyncio.coroutine
@@ -19,7 +18,7 @@ def connect(self, host, port):
yield from asyncio.open_connection(host, port)
try:
self._writer.write(_init_string)
self._receive_task = asyncio.Task(self._receive_cr())
self.receive_task = asyncio.Task(self._receive_cr())
except:
self._writer.close()
del self._reader
@@ -29,9 +28,9 @@ def connect(self, host, port):
@asyncio.coroutine
def close(self):
try:
self._receive_task.cancel()
self.receive_task.cancel()
try:
yield from asyncio.wait_for(self._receive_task, None)
yield from asyncio.wait_for(self.receive_task, None)
except asyncio.CancelledError:
pass
finally:
@@ -41,42 +40,74 @@ def close(self):

@asyncio.coroutine
def _receive_cr(self):
try:
target = None
while True:
line = yield from self._reader.readline()
obj = pyon.decode(line.decode())
action = obj["action"]

if action == "init":
target = self.target_builder(obj["struct"])
elif action == "append":
target.append(obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "delitem":
target.__delitem__(obj["key"])
if self.notify_cb is not None:
self.notify_cb()
except:
self.error_cb()
raise
target = None
while True:
line = yield from self._reader.readline()
if not line:
return
obj = pyon.decode(line.decode())
action = obj["action"]

if action == "init":
target = self.target_builder(obj["struct"])
elif action == "append":
target.append(obj["x"])
elif action == "insert":
target.insert(obj["i"], obj["x"])
elif action == "pop":
target.pop(obj["i"])
elif action == "delitem":
target.__delitem__(obj["key"])
if self.notify_cb is not None:
self.notify_cb()


class Notifier:
def __init__(self, backing_struct):
self.backing_struct = backing_struct
self.publisher = None

# Backing struct modification methods.
# All modifications must go through them!

def append(self, x):
self.backing_struct.append(x)
if self.publisher is not None:
self.publisher.publish({"action": "append", "x": x})

def insert(self, i, x):
self.backing_struct.insert(i, x)
if self.publisher is not None:
self.publisher.publish({"action": "insert", "i": i, "x": x})

def pop(self, i=-1):
r = self.backing_struct.pop(i)
if self.publisher is not None:
self.publisher.publish({"action": "pop", "i": i})
return r

def __delitem__(self, key):
self.backing_struct.__delitem__(key)
if self.publisher is not None:
self.publisher.publish({"action": "delitem", "key": key})


class Publisher(AsyncioServer):
def __init__(self, backing_struct):
def __init__(self, notifier):
AsyncioServer.__init__(self)
self.backing_struct = backing_struct
self.notifier = notifier
self._recipients = set()

self.notifier.publisher = self

@asyncio.coroutine
def _handle_connection_cr(self, reader, writer):
try:
line = yield from reader.readline()
if line != _init_string:
return

obj = {"action": "init", "struct": self.backing_struct}
obj = {"action": "init", "struct": self.notifier.backing_struct}
line = pyon.encode(obj) + "\n"
writer.write(line.encode())

@@ -96,24 +127,8 @@ def _handle_connection_cr(self, reader, writer):
finally:
writer.close()

def _publish(self, obj):
def publish(self, obj):
line = pyon.encode(obj) + "\n"
line = line.encode()
for recipient in self._recipients:
recipient.put_nowait(line)

# Backing struct modification methods.
# All modifications must go through them!

def append(self, x):
self.backing_struct.append(x)
self._publish({"action": "append", "x": x})

def pop(self, i=-1):
r = self.backing_struct.pop(i)
self._publish({"action": "pop", "i": i})
return r

def __delitem__(self, key):
self.backing_struct.__delitem__(key)
self._publish({"action": "delitem", "key": key})
5 changes: 5 additions & 0 deletions artiq/management/network.py → artiq/management/tools.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import asyncio
import sys
from copy import copy


def clear_screen():
sys.stdout.write("\x1b[2J\x1b[H")


class AsyncioServer:
"""Generic TCP server based on asyncio.
63 changes: 43 additions & 20 deletions frontend/artiq_client.py
Original file line number Diff line number Diff line change
@@ -2,10 +2,14 @@

import argparse
import time
import sys
import asyncio

from prettytable import PrettyTable

from artiq.management.pc_rpc import Client
from artiq.management.sync_struct import Subscriber
from artiq.management.tools import clear_screen


def _get_args():
@@ -14,7 +18,7 @@ def _get_args():
"-s", "--server", default="::1",
help="hostname or IP of the master to connect to")
parser.add_argument(
"--port", default=8888, type=int,
"--port", default=None, type=int,
help="TCP port to use to connect to the master")

subparsers = parser.add_subparsers(dest="action")
@@ -42,8 +46,8 @@ def _get_args():
parser_cancel.add_argument("rid", type=int,
help="run identifier (RID/PRID)")

parser_show = subparsers.add_parser("show",
help="show the experiment schedule")
parser_show = subparsers.add_parser("show-queue",
help="show the experiment queue")

return parser.parse_args()

@@ -70,30 +74,25 @@ def _action_cancel(remote, args):
remote.cancel_once(args.rid)


def _action_show(remote, args):
ce, queue, periodic = remote.get_schedule()
if ce is not None or queue:
def _show_queue(queue):
clear_screen()
if queue:
table = PrettyTable(["RID", "File", "Unit", "Function", "Timeout"])
if ce is not None:
rid, run_params, timeout, t = ce
print("Currently executing RID {} for {:.1f}s".format(rid, t))
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
table.add_row(row)
for rid, run_params, timeout in queue:
row = [rid, run_params["file"]]
for x in run_params["unit"], run_params["function"], timeout:
row.append("-" if x is None else x)
table.add_row(row)
print("Run queue:")
print(table)
else:
print("Queue is empty")


def _show_periodic(periodic):
clear_screen()
if periodic:
table = PrettyTable(["Next run", "PRID", "File", "Unit", "Function",
"Timeout", "Period"])
print("Periodic schedule:")
sp = sorted(periodic.items(), key=lambda x: x[1][0])
for prid, (next_run, run_params, timeout, period) in sp:
row = [time.strftime("%m/%d %H:%M:%S", time.localtime(next_run)),
@@ -107,13 +106,37 @@ def _action_show(remote, args):
print("No periodic schedule")


def main():
args = _get_args()
remote = Client(args.server, args.port, "master")
def _run_subscriber(host, port, subscriber):
loop = asyncio.get_event_loop()
try:
globals()["_action_" + args.action](remote, args)
loop.run_until_complete(subscriber.connect(host, port))
try:
loop.run_until_complete(asyncio.wait_for(subscriber.receive_task,
None))
print("Connection to master lost")
finally:
loop.run_until_complete(subscriber.close())
finally:
remote.close_rpc()
loop.close()


def main():
args = _get_args()
if args.action == "show-queue":
queue = []
def init_queue(x):
queue[:] = x
return queue
subscriber = Subscriber(init_queue, lambda: _show_queue(queue))
port = 8887 if args.port is None else args.port
_run_subscriber(args.server, port, subscriber)
else:
port = 8888 if args.port is None else args.port
remote = Client(args.server, port, "schedule_control")
try:
globals()["_action_" + args.action](remote, args)
finally:
remote.close_rpc()

if __name__ == "__main__":
main()
23 changes: 17 additions & 6 deletions frontend/artiq_master.py
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@
import argparse

from artiq.management.pc_rpc import Server
from artiq.management.sync_struct import Publisher
from artiq.management.scheduler import Scheduler


@@ -13,8 +14,11 @@ def _get_args():
"--bind", default="::1",
help="hostname or IP address to bind to")
parser.add_argument(
"--port", default=8888, type=int,
help="TCP port to listen to")
"--port-schedule-control", default=8888, type=int,
help="TCP port to listen to for schedule control")
parser.add_argument(
"--port-schedule-notify", default=8887, type=int,
help="TCP port to listen to for schedule notifications")
return parser.parse_args()


@@ -25,12 +29,19 @@ def main():
scheduler = Scheduler("ddb.pyon", "pdb.pyon")
loop.run_until_complete(scheduler.start())
try:
server = Server(scheduler, "master")
loop.run_until_complete(server.start(args.bind, args.port))
schedule_control = Server(scheduler, "schedule_control")
loop.run_until_complete(schedule_control.start(
args.bind, args.port_schedule_control))
try:
loop.run_forever()
schedule_notify = Publisher(scheduler.queued)
loop.run_until_complete(schedule_notify.start(
args.bind, args.port_schedule_notify))
try:
loop.run_forever()
finally:
loop.run_until_complete(schedule_notify.stop())
finally:
loop.run_until_complete(server.stop())
loop.run_until_complete(schedule_control.stop())
finally:
loop.run_until_complete(scheduler.stop())
finally:

0 comments on commit 1fdad21

Please sign in to comment.