Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Add support to set additional socket options
from circuits.net.sockets import TCPServer
from socket import SOL_SOCKET, SO_REUSEPORT
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1
  • Loading branch information
spaceone committed Jan 24, 2017
1 parent 25442fc commit e92d26b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 47 deletions.
102 changes: 55 additions & 47 deletions circuits/net/sockets.py
Expand Up @@ -10,8 +10,8 @@
EMFILE, ENFILE, ENOBUFS, ENOMEM, ENOTCONN, EPERM, EPIPE, EWOULDBLOCK,
)
from socket import (
AF_INET, AF_INET6, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR, SOCK_DGRAM,
SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError, gaierror,
AF_INET, AF_INET6, AF_UNIX, IPPROTO_IP, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR,
SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError, gaierror,
getaddrinfo, getfqdn, gethostbyname, gethostname, socket,
)
from time import time
Expand Down Expand Up @@ -78,6 +78,11 @@ class Client(BaseComponent):

channel = "client"

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_IP
socket_options = []

def __init__(self, bind=None, bufsize=BUFSIZE, channel=channel, **kwargs):
super(Client, self).__init__(channel=channel, **kwargs)

Expand Down Expand Up @@ -233,24 +238,29 @@ def __on_write(self, sock):
elif self._poller.isWriting(self._sock):
self._poller.removeWriter(self._sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPClient(Client):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_TCP
socket_options = [
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def init(self, connect_timeout=5, *args, **kwargs):
self.connect_timeout = connect_timeout

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM, IPPROTO_TCP)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)

return sock

@handler("connect") # noqa
def connect(self, host, port, secure=False, **kwargs):
# XXX: C901: This has a high McCacbe complexity score of 10.
Expand Down Expand Up @@ -322,16 +332,9 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXClient(Client):

def _create_socket(self):
from socket import AF_UNIX

sock = socket(AF_UNIX, SOCK_STREAM)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)

return sock
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = []

@handler("ready")
def ready(self, component):
Expand Down Expand Up @@ -387,11 +390,13 @@ def on_error(sock, err):
class Server(BaseComponent):

channel = "server"
socket_protocol = IPPROTO_IP

def __init__(self, bind, secure=False, backlog=BACKLOG,
bufsize=BUFSIZE, channel=channel, **kwargs):
super(Server, self).__init__(channel=channel)

self.socket_options = self.socket_options[:] + kwargs.get('socket_options', [])
self._bind = self.parse_bind_parameter(bind)

self._backlog = backlog
Expand Down Expand Up @@ -647,18 +652,28 @@ def _on_write(self, sock):
elif self._poller.isWriting(sock):
self._poller.removeWriter(sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPServer(Server):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
sock.setblocking(False)
sock.bind(self._bind)
sock = super(TCPServer, self)._create_socket()
sock.listen(self._backlog)

return sock
Expand Down Expand Up @@ -706,17 +721,17 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXServer(Server):

def _create_socket(self):
from socket import AF_UNIX
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
]

def _create_socket(self):
if os.path.exists(self._bind):
os.unlink(self._bind)

sock = socket(AF_UNIX, SOCK_STREAM)
sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setblocking(False)
sock = super(UNIXServer, self)._create_socket()
sock.listen(self._backlog)

return sock
Expand All @@ -725,18 +740,11 @@ def _create_socket(self):
class UDPServer(Server):

socket_family = AF_INET

def _create_socket(self):
sock = socket(self.socket_family, SOCK_DGRAM)

sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.setblocking(False)

return sock
socket_type = SOCK_DGRAM
socket_options = [
(SOL_SOCKET, SO_BROADCAST, 1),
(SOL_SOCKET, SO_REUSEADDR, 1)
]

def _close(self, sock):
self._poller.discard(sock)
Expand Down
7 changes: 7 additions & 0 deletions tests/net/test_socket_options.py
@@ -0,0 +1,7 @@
from circuits.net.sockets import TCPServer
from socket import SOL_SOCKET, SO_REUSEPORT


def test_socket_options_server():
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1

0 comments on commit e92d26b

Please sign in to comment.