Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: cda4a0765d74^
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 1b7020dff36a
Choose a head ref

Commits on Jan 26, 2016

  1. Copy the full SHA
    cda4a07 View commit details
  2. lda: test tweaks

    jordens committed Jan 26, 2016
    Copy the full SHA
    ccac852 View commit details
  3. Fix typo.

    whitequark committed Jan 26, 2016
    Copy the full SHA
    7f9a180 View commit details

Commits on Jan 27, 2016

  1. Copy the full SHA
    0acc86b View commit details
  2. Copy the full SHA
    5bead8b View commit details
  3. Copy the full SHA
    000794e View commit details
  4. Revert "try debugging weird unittest failure"

    This reverts commit 000794e.
    sbourdeauducq committed Jan 27, 2016
    Copy the full SHA
    9f9a23b View commit details
  5. Revert "Revert "test/pipe_ipc: temporarily skip test""

    This reverts commit 7a9864b.
    sbourdeauducq committed Jan 27, 2016
    Copy the full SHA
    83fd160 View commit details
  6. Copy the full SHA
    dce2aac View commit details
  7. Copy the full SHA
    79c0488 View commit details
  8. Copy the full SHA
    3d9fc7a View commit details
  9. Copy the full SHA
    b753306 View commit details
  10. pyon: handle \r

    sbourdeauducq committed Jan 27, 2016
    Copy the full SHA
    590354d View commit details
  11. Copy the full SHA
    170b438 View commit details
  12. Copy the full SHA
    dca44ef View commit details
  13. Copy the full SHA
    cfa4f79 View commit details
  14. Copy the full SHA
    a4fb8f3 View commit details
  15. Copy the full SHA
    5076c85 View commit details
  16. Copy the full SHA
    7aaeb63 View commit details
  17. Copy the full SHA
    27c12a5 View commit details
  18. Copy the full SHA
    ebb959c View commit details
  19. Copy the full SHA
    022a1ff View commit details
  20. Copy the full SHA
    f78eecb View commit details
  21. Merge branch 'master' into testbench-controllers

    * master:
      gui/experiments: float/bring into focus already open docks when opening experiments
      gui: reduce size of console dock
      protocols/logging,pc_rpc: do not print errors on Windows when clients disconnect
      gui: reduce size of schedule dock
      worker: Windows VMs are slow, increase send_timeout
      protocol/sync_struct: Windows also raises ConnectionAbortedError on disconnection
      gui: reduce size of log dock
      gui: reduce size of experiment dock
      protocols/logging/LogParser: handle Windows CRLF
      pyon: handle \r
      test/pipe_ipc: re-enable
      protocols/asyncio_server: minor cleanup
      protocols/pipe_ipc: Windows support
      Revert "Revert "test/pipe_ipc: temporarily skip test""
      Revert "try debugging weird unittest failure"
      try debugging weird unittest failure
      conda: restrict binutils-or1k-linux dependency to linux.
      transforms.iodelay_estimator: make diagnostics much more clear.
      Fix typo.
    jordens committed Jan 27, 2016
    Copy the full SHA
    99f7889 View commit details

Commits on Jan 28, 2016

  1. 12
    Copy the full SHA
    1b7020d View commit details
44 changes: 29 additions & 15 deletions artiq/compiler/transforms/iodelay_estimator.py
Original file line number Diff line number Diff line change
@@ -24,11 +24,11 @@ def __init__(self, engine, ref_period):
self.current_goto = None
self.current_return = None

def evaluate(self, node, abort):
def evaluate(self, node, abort, context):
if isinstance(node, asttyped.NumT):
return iodelay.Const(node.n)
elif isinstance(node, asttyped.CoerceT):
return self.evaluate(node.value, abort)
return self.evaluate(node.value, abort, context)
elif isinstance(node, asttyped.NameT):
if self.current_args is None:
note = diagnostic.Diagnostic("note",
@@ -48,8 +48,8 @@ def evaluate(self, node, abort):
]
abort(notes)
elif isinstance(node, asttyped.BinOpT):
lhs = self.evaluate(node.left, abort)
rhs = self.evaluate(node.right, abort)
lhs = self.evaluate(node.left, abort, context)
rhs = self.evaluate(node.right, abort, context)
if isinstance(node.op, ast.Add):
return lhs + rhs
elif isinstance(node.op, ast.Sub):
@@ -62,12 +62,14 @@ def evaluate(self, node, abort):
return lhs // rhs
else:
note = diagnostic.Diagnostic("note",
"this operator is not supported", {},
"this operator is not supported {context}",
{"context": context},
node.op.loc)
abort([note])
else:
note = diagnostic.Diagnostic("note",
"this expression is not supported", {},
"this expression is not supported {context}",
{"context": context},
node.loc)
abort([note])

@@ -143,14 +145,14 @@ def visit_FunctionDefT(self, node):
def visit_LambdaT(self, node):
self.visit_function(node.args, node.body, node.type.find(), node.loc)

def get_iterable_length(self, node):
def get_iterable_length(self, node, context):
def abort(notes):
self.abort("for statement cannot be interleaved because "
"trip count is indeterminate",
"iteration count is indeterminate",
node.loc, notes)

def evaluate(node):
return self.evaluate(node, abort)
return self.evaluate(node, abort, context)

if isinstance(node, asttyped.CallT) and types.is_builtin(node.func.type, "range"):
range_min, range_max, range_step = iodelay.Const(0), None, iodelay.Const(1)
@@ -177,10 +179,11 @@ def visit_ForT(self, node):
self.current_delay = old_delay
else:
if self.current_goto is not None:
self.abort("loop trip count is indeterminate because of control flow",
self.abort("loop iteration count is indeterminate because of control flow",
self.current_goto.loc)

node.trip_count = self.get_iterable_length(node.iter).fold()
context = "in an iterable used in a for loop that is being interleaved"
node.trip_count = self.get_iterable_length(node.iter, context).fold()
node.trip_interval = self.current_delay.fold()
self.current_delay = old_delay + node.trip_interval * node.trip_count
self.current_goto = old_goto
@@ -231,6 +234,10 @@ def visit_With(self, node):
# Interleave failures inside `with` statements are hard failures,
# since there's no chance that the code will never actually execute
# inside a `with` statement after all.
note = diagnostic.Diagnostic("note",
"while interleaving this 'with parallel:' statement", {},
node.loc)
error.cause.notes += [note]
self.engine.process(error.cause)

flow_stmt = None
@@ -258,15 +265,17 @@ def visit_With(self, node):
def visit_CallT(self, node):
typ = node.func.type.find()
def abort(notes):
self.abort("this call cannot be interleaved because "
self.abort("call cannot be interleaved because "
"an argument cannot be statically evaluated",
node.loc, notes)

if types.is_builtin(typ, "delay"):
value = self.evaluate(node.args[0], abort=abort)
value = self.evaluate(node.args[0], abort=abort,
context="as an argument for delay()")
call_delay = iodelay.SToMU(value, ref_period=self.ref_period)
elif types.is_builtin(typ, "delay_mu"):
value = self.evaluate(node.args[0], abort=abort)
value = self.evaluate(node.args[0], abort=abort,
context="as an argument for delay_mu()")
call_delay = value
elif not types.is_builtin(typ):
if types.is_function(typ):
@@ -297,7 +306,12 @@ def abort(notes):
args[arg_name] = arg_node

free_vars = delay.duration.free_vars()
node.arg_exprs = { arg: self.evaluate(args[arg], abort=abort) for arg in free_vars }
node.arg_exprs = {
arg: self.evaluate(args[arg], abort=abort,
context="in the expression for argument '{}' "
"that affects I/O delay".format(arg))
for arg in free_vars
}
call_delay = delay.duration.fold(node.arg_exprs)
else:
assert False
2 changes: 1 addition & 1 deletion artiq/coredevice/core.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
def _render_diagnostic(diagnostic, colored):
def shorten_path(path):
return path.replace(artiq_dir, "<artiq>")
lines = [shorten_path(path) for path in diagnostic.render(colored)]
lines = [shorten_path(path) for path in diagnostic.render(colored=colored)]
return "\n".join(lines)

class _DiagnosticEngine(diagnostic.Engine):
255 changes: 255 additions & 0 deletions artiq/devices/ctlmgr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import asyncio
import logging
import subprocess
import shlex
import socket

from artiq.protocols.sync_struct import Subscriber
from artiq.protocols.pc_rpc import AsyncioClient
from artiq.protocols.logging import parse_log_message, log_with_name
from artiq.tools import Condition, TaskObject


logger = logging.getLogger(__name__)


class Controller:
def __init__(self, name, ddb_entry):
self.name = name
self.command = ddb_entry["command"]
self.retry_timer = ddb_entry.get("retry_timer", 5)
self.retry_timer_backoff = ddb_entry.get("retry_timer_backoff", 1.1)

self.host = ddb_entry["host"]
self.port = ddb_entry["port"]
self.ping_timer = ddb_entry.get("ping_timer", 30)
self.ping_timeout = ddb_entry.get("ping_timeout", 30)
self.term_timeout = ddb_entry.get("term_timeout", 30)

self.retry_timer_cur = self.retry_timer
self.retry_now = Condition()
self.process = None
self.launch_task = asyncio.Task(self.launcher())

async def end(self):
self.launch_task.cancel()
await asyncio.wait_for(self.launch_task, None)

async def call(self, method, *args, **kwargs):
remote = AsyncioClient()
await remote.connect_rpc(self.host, self.port, None)
try:
targets, _ = remote.get_rpc_id()
remote.select_rpc_target(targets[0])
r = await getattr(remote, method)(*args, **kwargs)
finally:
remote.close_rpc()
return r

async def _ping(self):
try:
ok = await asyncio.wait_for(self.call("ping"),
self.ping_timeout)
if ok:
self.retry_timer_cur = self.retry_timer
return ok
except:
return False

async def _wait_and_ping(self):
while True:
try:
await asyncio.wait_for(self.process.wait(),
self.ping_timer)
except asyncio.TimeoutError:
logger.debug("pinging controller %s", self.name)
ok = await self._ping()
if not ok:
logger.warning("Controller %s ping failed", self.name)
await self._terminate()
return
else:
break

async def forward_logs(self, stream):
source = "controller({})".format(self.name)
while True:
try:
entry = (await stream.readline())
if not entry:
break
entry = entry[:-1]
level, name, message = parse_log_message(entry.decode())
log_with_name(name, level, message, extra={"source": source})
except:
logger.debug("exception in log forwarding", exc_info=True)
break
logger.debug("stopped log forwarding of stream %s of %s",
stream, self.name)

async def launcher(self):
try:
while True:
logger.info("Starting controller %s with command: %s",
self.name, self.command)
try:
self.process = await asyncio.create_subprocess_exec(
*shlex.split(self.command),
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
asyncio.ensure_future(self.forward_logs(
self.process.stdout))
asyncio.ensure_future(self.forward_logs(
self.process.stderr))
await self._wait_and_ping()
except FileNotFoundError:
logger.warning("Controller %s failed to start", self.name)
else:
logger.warning("Controller %s exited", self.name)
logger.warning("Restarting in %.1f seconds",
self.retry_timer_cur)
try:
await asyncio.wait_for(self.retry_now.wait(),
self.retry_timer_cur)
except asyncio.TimeoutError:
pass
self.retry_timer_cur *= self.retry_timer_backoff
except asyncio.CancelledError:
await self._terminate()

async def _terminate(self):
logger.info("Terminating controller %s", self.name)
if self.process is not None and self.process.returncode is None:
try:
await asyncio.wait_for(self.call("terminate"),
self.term_timeout)
except:
logger.warning("Controller %s did not respond to terminate "
"command, killing", self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
try:
await asyncio.wait_for(self.process.wait(),
self.term_timeout)
except:
logger.warning("Controller %s failed to exit, killing",
self.name)
try:
self.process.kill()
except ProcessLookupError:
pass
await self.process.wait()
logger.debug("Controller %s terminated", self.name)


def get_ip_addresses(host):
try:
addrinfo = socket.getaddrinfo(host, None)
except:
return set()
return {info[4][0] for info in addrinfo}


class Controllers:
def __init__(self):
self.host_filter = None
self.active_or_queued = set()
self.queue = asyncio.Queue()
self.active = dict()
self.process_task = asyncio.Task(self._process())

async def _process(self):
while True:
action, param = await self.queue.get()
if action == "set":
k, ddb_entry = param
if k in self.active:
await self.active[k].end()
self.active[k] = Controller(k, ddb_entry)
elif action == "del":
await self.active[param].end()
del self.active[param]
self.queue.task_done()
if action not in ("set", "del"):
raise ValueError

def __setitem__(self, k, v):
if (isinstance(v, dict) and v["type"] == "controller" and
self.host_filter in get_ip_addresses(v["host"])):
v["command"] = v["command"].format(name=k,
bind=self.host_filter,
port=v["port"])
self.queue.put_nowait(("set", (k, v)))
self.active_or_queued.add(k)

def __delitem__(self, k):
if k in self.active_or_queued:
self.queue.put_nowait(("del", k))
self.active_or_queued.remove(k)

def delete_all(self):
for name in set(self.active_or_queued):
del self[name]

async def shutdown(self):
self.process_task.cancel()
for c in self.active.values():
await c.end()


class ControllerDB:
def __init__(self):
self.current_controllers = Controllers()

def set_host_filter(self, host_filter):
self.current_controllers.host_filter = host_filter

def sync_struct_init(self, init):
if self.current_controllers is not None:
self.current_controllers.delete_all()
for k, v in init.items():
self.current_controllers[k] = v
return self.current_controllers


class ControllerManager(TaskObject):
def __init__(self, server, port, retry_master):
self.server = server
self.port = port
self.retry_master = retry_master
self.controller_db = ControllerDB()

async def _do(self):
try:
subscriber = Subscriber("devices",
self.controller_db.sync_struct_init)
while True:
try:
def set_host_filter():
s = subscriber.writer.get_extra_info("socket")
localhost = s.getsockname()[0]
self.controller_db.set_host_filter(localhost)
await subscriber.connect(self.server, self.port,
set_host_filter)
try:
await asyncio.wait_for(subscriber.receive_task, None)
finally:
await subscriber.close()
except (ConnectionAbortedError, ConnectionError,
ConnectionRefusedError, ConnectionResetError) as e:
logger.warning("Connection to master failed (%s: %s)",
e.__class__.__name__, str(e))
else:
logger.warning("Connection to master lost")
logger.warning("Retrying in %.1f seconds", self.retry_master)
await asyncio.sleep(self.retry_master)
except asyncio.CancelledError:
pass
finally:
await self.controller_db.current_controllers.shutdown()

def retry_now(self, k):
"""If a controller is disabled and pending retry, perform that retry
now."""
self.controller_db.current_controllers.active[k].retry_now.notify()
Loading