Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 85b6a7ca241c
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 7a10cb8c32df
Choose a head ref
  • 2 commits
  • 5 files changed
  • 1 contributor

Commits on Oct 23, 2014

  1. Copy the full SHA
    2aac43c View commit details
  2. mc: use pc_rpc

    sbourdeauducq committed Oct 23, 2014
    Copy the full SHA
    7a10cb8 View commit details
Showing with 140 additions and 66 deletions.
  1. +88 −0 artiq/management/pc_rpc.py
  2. +18 −13 artiq/management/scheduler.py
  3. +2 −3 artiq/management/worker.py
  4. +10 −18 frontend/artiq
  5. +22 −32 frontend/artiqd
88 changes: 88 additions & 0 deletions artiq/management/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import socket
import json
import asyncio


class RemoteError(Exception):
pass


class Client:
def __init__(self, host, port):
self.socket = socket.create_connection((host, port))

def close(self):
self.socket.close()

def do_rpc(self, name, args, kwargs):
obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
line = json.dumps(obj) + "\n"
self.socket.sendall(line.encode())

buf = self.socket.recv(4096).decode()
while "\n" not in buf:
more = self.socket.recv(4096)
if not more:
break
buf += more.decode()
obj = json.loads(buf)
if obj["result"] == "ok":
return obj["ret"]
elif obj["result"] == "error":
raise RemoteError(obj["message"])
else:
raise ValueError

def __getattr__(self, name):
def proxy(*args, **kwargs):
return self.do_rpc(name, args, kwargs)
return proxy


class Server:
def __init__(self, target):
self.target = target
self.client_tasks = set()

@asyncio.coroutine
def start(self, host, port):
self.server = yield from asyncio.start_server(self.handle_connection,
host, port)

@asyncio.coroutine
def stop(self):
for task in self.client_tasks:
task.cancel()
self.server.close()
yield from self.server.wait_closed()
del self.server

def client_done(self, task):
self.client_tasks.remove(task)

def handle_connection(self, reader, writer):
task = asyncio.Task(self.handle_connection_task(reader, writer))
self.client_tasks.add(task)
task.add_done_callback(self.client_done)

@asyncio.coroutine
def handle_connection_task(self, reader, writer):
try:
while True:
line = yield from reader.readline()
if not line:
break
obj = json.loads(line.decode())
action = obj["action"]
if action == "call":
method = getattr(self.target, obj["name"])
try:
ret = method(*obj["args"], **obj["kwargs"])
obj = {"result": "ok", "ret": ret}
except Exception as e:
obj = {"result": "error",
"message": type(e).__name__ + ": " + str(e)}
line = json.dumps(obj) + "\n"
writer.write(line.encode())
finally:
writer.close()
31 changes: 18 additions & 13 deletions artiq/management/scheduler.py
Original file line number Diff line number Diff line change
@@ -4,23 +4,28 @@


class Scheduler:
def __init__(self, loop):
self.loop = loop
def __init__(self):
self.worker = Worker()
self.queue = asyncio.Queue()

def __enter__(self):
self.worker = Worker(self.loop)
return self
@asyncio.coroutine
def start(self):
self.task = asyncio.Task(self._schedule())
yield from self.worker.create_process()

def __exit__(self, type, value, traceback):
self.loop.run_until_complete(self.worker.end_process())
del self.worker
@asyncio.coroutine
def stop(self):
self.task.cancel()
yield from asyncio.wait([self.task])
del self.task
yield from self.worker.end_process()

def add_run_once(self, item, timeout):
yield from self.queue.put((item, timeout))
def run_once(self, run_params, timeout):
self.queue.put_nowait((run_params, timeout))

def task(self):
@asyncio.coroutine
def _schedule(self):
while True:
item, timeout = yield from self.queue.get()
result = yield from self.worker.run(item, timeout)
run_params, timeout = yield from self.queue.get()
result = yield from self.worker.run(run_params, timeout)
print(result)
5 changes: 2 additions & 3 deletions artiq/management/worker.py
Original file line number Diff line number Diff line change
@@ -10,12 +10,11 @@ class WorkerFailed(Exception):


class Worker:
def __init__(self, loop, send_timeout=0.5, start_reply_timeout=1.0,
def __init__(self, send_timeout=0.5, start_reply_timeout=1.0,
term_timeout=1.0):
self.send_timeout = send_timeout
self.start_reply_timeout = start_reply_timeout
self.term_timeout = term_timeout
loop.run_until_complete(self.create_process())

@asyncio.coroutine
def create_process(self):
@@ -30,7 +29,7 @@ def _send(self, obj, timeout):
self.process.stdin.write("\n".encode())
try:
fut = self.process.stdin.drain()
if fut is not (): # FIXME: why does Python return this?
if fut is not (): # FIXME: why does Python return this?
yield from asyncio.wait_for(fut, timeout=timeout)
except asyncio.TimeoutError:
raise WorkerFailed("Timeout sending data from worker")
28 changes: 10 additions & 18 deletions frontend/artiq
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env python3

import argparse
import socket
import json

from artiq.management.pc_rpc import Client


def _get_args():
@@ -18,28 +18,20 @@ def _get_args():
return parser.parse_args()


def _send_obj(sock, obj):
line = json.dumps(obj) + "\n"
sock.sendall(line.encode())


def main():
args = _get_args()

with socket.create_connection(("::1", 8888)) as sock:
remote = Client("::1", 8888)
try:
for path, name, timeout in args.run_once:
obj = {
"action": "run_once",
"run_params": {
remote.run_once(
{
"path": path,
"name": name
},
"timeout": timeout
}
_send_obj(sock, obj)
}, int(timeout))
if args.quit_master:
obj = {"action": "quit"}
_send_obj(sock, obj)
remote.quit()
finally:
remote.close()

if __name__ == "__main__":
main()
54 changes: 22 additions & 32 deletions frontend/artiqd
Original file line number Diff line number Diff line change
@@ -1,52 +1,42 @@
#!/usr/bin/env python3

import asyncio
import json

from artiq.management.pc_rpc import Server
from artiq.management.scheduler import Scheduler


class Server:
def __init__(self, loop, scheduler):
class Master:
def __init__(self, scheduler):
self.scheduler = scheduler
self.terminate_notify = asyncio.Semaphore(0)
loop.run_until_complete(
asyncio.start_server(self.handle_connection, "::1", 8888))

@asyncio.coroutine
def handle_connection(self, reader, writer):
while True:
line = yield from reader.readline()
if not line:
writer.close()
return
obj = json.loads(line.decode())
action = obj["action"]
if action == "run_once":
yield from self.scheduler.add_run_once(obj["run_params"],
float(obj["timeout"]))
elif action == "quit":
self.terminate_notify.release()
else:
print("warning: unknown action " + action)

@asyncio.coroutine
def task(self, scheduler_task):
def wait_quit(self):
yield from self.terminate_notify.acquire()
scheduler_task.cancel()

def quit(self):
self.terminate_notify.release()

def run_once(self, run_params, timeout):
self.scheduler.run_once(run_params, timeout)


def main():
loop = asyncio.get_event_loop()
try:
with Scheduler(loop) as scheduler:
server = Server(loop, scheduler)
scheduler_task = asyncio.Task(scheduler.task())
server_task = asyncio.Task(server.task(scheduler_task))
loop.run_until_complete(asyncio.wait([
scheduler_task,
server_task
]))
scheduler = Scheduler()
loop.run_until_complete(scheduler.start())
try:
master = Master(scheduler)
server = Server(master)
loop.run_until_complete(server.start("::1", 8888))
try:
loop.run_until_complete(master.wait_quit())
finally:
loop.run_until_complete(server.stop())
finally:
loop.run_until_complete(scheduler.stop())
finally:
loop.close()