Skip to content

Commit

Permalink
protocols: add fire-and-forget RPC
Browse files Browse the repository at this point in the history
sbourdeauducq committed Jun 15, 2015

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
1 parent a5b34be commit 76e034c
Showing 2 changed files with 58 additions and 1 deletion.
45 changes: 45 additions & 0 deletions artiq/protocols/fire_and_forget.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import threading
import logging


logger = logging.getLogger(__name__)


class FFProxy:
"""Proxies a target object and runs its methods in the background.
All method calls to this object are forwarded to the target and executed
in a background thread. Method calls return immediately. Exceptions from
the target method are turned into warnings. At most one method from the
target object may be executed in the background; if a new call is
submitted while the previous one is still executing, a warning is printed
and the new call is dropped.
This feature is typically used to wrap slow and non-critical RPCs in
experiments.
"""
def __init__(self, target):
self.target = target
self._thread = None

def ff_join(self):
"""Waits until any background method finishes its execution."""
if self._thread is not None:
self._thread.join()

def __getattr__(self, k):
def run_in_thread(*args, **kwargs):
if self._thread is not None and self._thread.is_alive():
logger.warning("skipping fire-and-forget call to %r.%s as "
"previous call did not complete",
self.target, k)
return
def thread_body():
try:
getattr(self.target, k)(*args, **kwargs)
except:
logger.warning("fire-and-forget call to %r.%s raised an "
"exception:", self.target, k, exc_info=True)
self._thread = threading.Thread(target=thread_body)
self._thread.start()
return run_in_thread
14 changes: 13 additions & 1 deletion artiq/test/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@

import numpy as np

from artiq.protocols import pc_rpc
from artiq.protocols import pc_rpc, fire_and_forget


test_address = "::1"
@@ -84,6 +84,18 @@ def test_asyncio_echo(self):
self._run_server_and_test(self._loop_asyncio_echo)


class FireAndForgetCase(unittest.TestCase):
def _set_ok(self):
self.ok = True

def test_fire_and_forget(self):
self.ok = False
p = fire_and_forget.FFProxy(self)
p._set_ok()
p.ff_join()
self.assertTrue(self.ok)


class Echo:
def __init__(self):
self.terminate_notify = asyncio.Semaphore(0)

0 comments on commit 76e034c

Please sign in to comment.