-
Notifications
You must be signed in to change notification settings - Fork 201
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
monkey-patch asyncio.proactor_events to handle ConnectionAbortedError…
… on Windows. Closes #247
1 parent
d0cf589
commit 579168f
Showing
3 changed files
with
159 additions
and
108 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
import sys | ||
|
||
|
||
__all__ = [] | ||
|
||
|
||
if sys.version_info[:3] == (3, 5, 1): | ||
import asyncio | ||
import socket | ||
import collections | ||
import itertools | ||
import os | ||
|
||
# See https://github.com/m-labs/artiq/issues/253 | ||
@asyncio.coroutines.coroutine | ||
def create_server(self, protocol_factory, host=None, port=None, | ||
*, | ||
family=socket.AF_UNSPEC, | ||
flags=socket.AI_PASSIVE, | ||
sock=None, | ||
backlog=100, | ||
ssl=None, | ||
reuse_address=None, | ||
reuse_port=None): | ||
"""Create a TCP server. | ||
The host parameter can be a string, in that case the TCP server is bound | ||
to host and port. | ||
The host parameter can also be a sequence of strings and in that case | ||
the TCP server is bound to all hosts of the sequence. If a host | ||
appears multiple times (possibly indirectly e.g. when hostnames | ||
resolve to the same IP address), the server is only bound once to that | ||
host. | ||
Return a Server object which can be used to stop the service. | ||
This method is a coroutine. | ||
""" | ||
if isinstance(ssl, bool): | ||
raise TypeError('ssl argument must be an SSLContext or None') | ||
if host is not None or port is not None: | ||
if sock is not None: | ||
raise ValueError( | ||
'host/port and sock can not be specified at the same time') | ||
|
||
AF_INET6 = getattr(socket, 'AF_INET6', 0) | ||
if reuse_address is None: | ||
reuse_address = os.name == 'posix' and sys.platform != 'cygwin' | ||
sockets = [] | ||
if host == '': | ||
hosts = [None] | ||
elif (isinstance(host, str) or | ||
not isinstance(host, collections.Iterable)): | ||
hosts = [host] | ||
else: | ||
hosts = host | ||
|
||
fs = [self._create_server_getaddrinfo(host, port, family=family, | ||
flags=flags) | ||
for host in hosts] | ||
infos = yield from asyncio.tasks.gather(*fs, loop=self) | ||
infos = set(itertools.chain.from_iterable(infos)) | ||
|
||
completed = False | ||
try: | ||
for res in infos: | ||
af, socktype, proto, canonname, sa = res | ||
try: | ||
sock = socket.socket(af, socktype, proto) | ||
except socket.error: | ||
# Assume it's a bad family/type/protocol combination. | ||
if self._debug: | ||
asyncio.log.logger.warning('create_server() failed to create ' | ||
'socket.socket(%r, %r, %r)', | ||
af, socktype, proto, exc_info=True) | ||
continue | ||
sockets.append(sock) | ||
if reuse_address: | ||
sock.setsockopt( | ||
socket.SOL_SOCKET, socket.SO_REUSEADDR, True) | ||
if reuse_port: | ||
if not hasattr(socket, 'SO_REUSEPORT'): | ||
raise ValueError( | ||
'reuse_port not supported by socket module') | ||
else: | ||
sock.setsockopt( | ||
socket.SOL_SOCKET, socket.SO_REUSEPORT, True) | ||
# Disable IPv4/IPv6 dual stack support (enabled by | ||
# default on Linux) which makes a single socket | ||
# listen on both address families. | ||
if af == AF_INET6 and hasattr(socket, 'IPPROTO_IPV6'): | ||
sock.setsockopt(socket.IPPROTO_IPV6, | ||
socket.IPV6_V6ONLY, | ||
True) | ||
try: | ||
sock.bind(sa) | ||
except OSError as err: | ||
raise OSError(err.errno, 'error while attempting ' | ||
'to bind on address %r: %s' | ||
% (sa, err.strerror.lower())) | ||
completed = True | ||
finally: | ||
if not completed: | ||
for sock in sockets: | ||
sock.close() | ||
else: | ||
if sock is None: | ||
raise ValueError('Neither host/port nor sock were specified') | ||
sockets = [sock] | ||
|
||
server = asyncio.base_events.Server(self, sockets) | ||
for sock in sockets: | ||
sock.listen(backlog) | ||
sock.setblocking(False) | ||
self._start_serving(protocol_factory, sock, ssl, server) | ||
if self._debug: | ||
asyncio.log.logger.info("%r is serving", server) | ||
return server | ||
|
||
asyncio.base_events.BaseEventLoop.create_server = create_server | ||
|
||
|
||
# See https://github.com/m-labs/artiq/issues/247 | ||
def _loop_writing(self, f=None, data=None): | ||
try: | ||
assert f is self._write_fut | ||
self._write_fut = None | ||
self._pending_write = 0 | ||
if f: | ||
f.result() | ||
if data is None: | ||
data = self._buffer | ||
self._buffer = None | ||
if not data: | ||
if self._closing: | ||
self._loop.call_soon(self._call_connection_lost, None) | ||
if self._eof_written: | ||
self._sock.shutdown(socket.SHUT_WR) | ||
# Now that we've reduced the buffer size, tell the | ||
# protocol to resume writing if it was paused. Note that | ||
# we do this last since the callback is called immediately | ||
# and it may add more data to the buffer (even causing the | ||
# protocol to be paused again). | ||
self._maybe_resume_protocol() | ||
else: | ||
self._write_fut = self._loop._proactor.send(self._sock, data) | ||
if not self._write_fut.done(): | ||
assert self._pending_write == 0 | ||
self._pending_write = len(data) | ||
self._write_fut.add_done_callback(self._loop_writing) | ||
self._maybe_pause_protocol() | ||
else: | ||
self._write_fut.add_done_callback(self._loop_writing) | ||
except (ConnectionResetError, ConnectionAbortedError) as exc: | ||
self._force_close(exc) | ||
except OSError as exc: | ||
self._fatal_error(exc, 'Fatal write error on pipe transport') | ||
|
||
from asyncio import proactor_events | ||
proactor_events._ProactorBaseWritePipeTransport._loop_writing = _loop_writing |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters