Skip to content
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: m-labs/artiq
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: c91705c5d100
Choose a base ref
...
head repository: m-labs/artiq
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 6d11da388700
Choose a head ref
  • 2 commits
  • 1 file changed
  • 1 contributor

Commits on Feb 11, 2015

  1. Copy the full SHA
    a7405de View commit details
  2. Copy the full SHA
    6d11da3 View commit details
Showing with 112 additions and 0 deletions.
  1. +112 −0 artiq/protocols/pc_rpc.py
112 changes: 112 additions & 0 deletions artiq/protocols/pc_rpc.py
Original file line number Diff line number Diff line change
@@ -15,11 +15,17 @@
import socket
import asyncio
import traceback
import threading
import time
import logging

from artiq.protocols import pyon
from artiq.protocols.asyncio_server import AsyncioServer as _AsyncioServer


logger = logging.getLogger(__name__)


class RemoteError(Exception):
"""Raised when a RPC failed or raised an exception on the remote (server)
side.
@@ -235,6 +241,112 @@ def proxy(*args, **kwargs):
return proxy


class BestEffortClient:
def __init__(self, host, port, target_name,
firstcon_timeout=0.5, retry=5.0):
self.__host = host
self.__port = port
self.__target_name = target_name
self.__retry = retry

self.__conretry_terminate = False
self.__socket = None
try:
self.__coninit(firstcon_timeout)
except:
logger.warning("first connection attempt to %s:%d[%s] failed, "
"retrying in the background",
self.__host, self.__port, self.__target_name)
self.__start_conretry()
else:
self.__conretry_thread = None

def __coninit(self, timeout):
if timeout is None:
self.__socket = socket.create_connection(
(self.__host, self.__port))
else:
self.__socket = socket.create_connection(
(self.__host, self.__port), timeout)
self.__socket.sendall(_init_string)
server_identification = self.__recv()
if self.__target_name not in server_identification["targets"]:
raise IncompatibleServer
self.__socket.sendall((self.__target_name + "\n").encode())

def __start_conretry(self):
self.__conretry_thread = threading.Thread(target=self.__conretry)
self.__conretry_thread.start()

def __conretry(self):
while True:
try:
self.__coninit(None)
except:
if self.__conretry_terminate:
break
time.sleep(self.__retry)
else:
break
if not self.__conretry_terminate:
logger.warning("connection to %s:%d[%s] established in "
"the background",
self.__host, self.__port, self.__target_name)
if self.__conretry_terminate and self.__socket is not None:
self.__socket.close()
# must be after __socket.close() to avoid race condition
self.__conretry_thread = None

def close_rpc(self):
if self.__conretry_thread is None:
if self.__socket is not None:
self.__socket.close()
else:
# Let the thread complete I/O and then do the socket closing.
# Python fails to provide a way to cancel threads...
self.__conretry_terminate = True

def __send(self, obj):
line = pyon.encode(obj) + "\n"
self.__socket.sendall(line.encode())

def __recv(self):
buf = self.__socket.recv(4096).decode()
while "\n" not in buf:
more = self.__socket.recv(4096)
if not more:
break
buf += more.decode()
return pyon.decode(buf)

def __do_rpc(self, name, args, kwargs):
if self.__conretry_thread is not None:
return None

obj = {"action": "call", "name": name, "args": args, "kwargs": kwargs}
try:
self.__send(obj)
obj = self.__recv()
except:
logger.warning("connection failed while attempting "
"RPC to %s:%d[%s], re-establishing connection "
"in the background",
self.__host, self.__port, self.__target_name)
self.__start_conretry()
else:
if obj["status"] == "ok":
return obj["ret"]
elif obj["status"] == "failed":
raise RemoteError(obj["message"])
else:
raise ValueError

def __getattr__(self, name):
def proxy(*args, **kwargs):
return self.__do_rpc(name, args, kwargs)
return proxy


class Server(_AsyncioServer):
"""This class creates a TCP server that handles requests coming from
``Client`` objects.