Skip to content

Commit

Permalink
Add support to set additional socket options
Browse files Browse the repository at this point in the history
from circuits.net.sockets import TCPServer
from socket import SOL_SOCKET, SO_REUSEPORT
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1
  • Loading branch information
spaceone committed Jan 29, 2017
1 parent ebc2e4a commit b478492
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 48 deletions.
105 changes: 57 additions & 48 deletions circuits/net/sockets.py
Expand Up @@ -10,9 +10,10 @@
EMFILE, ENFILE, ENOBUFS, ENOMEM, ENOTCONN, EPERM, EPIPE, EWOULDBLOCK,
)
from socket import (
AF_INET, AF_INET6, IPPROTO_TCP, SO_BROADCAST, SO_REUSEADDR, SOCK_DGRAM,
SOCK_STREAM, SOL_SOCKET, TCP_NODELAY, error as SocketError, gaierror,
getaddrinfo, getfqdn, gethostbyname, gethostname, socket,
AF_INET, AF_INET6, AF_UNIX, IPPROTO_IP, IPPROTO_TCP, SO_BROADCAST,
SO_REUSEADDR, SOCK_DGRAM, SOCK_STREAM, SOL_SOCKET, TCP_NODELAY,
error as SocketError, gaierror, getaddrinfo, getfqdn, gethostbyname,
gethostname, socket,
)
from time import time

Expand Down Expand Up @@ -78,6 +79,11 @@ class Client(BaseComponent):

channel = "client"

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_IP
socket_options = []

def __init__(self, bind=None, bufsize=BUFSIZE, channel=channel, **kwargs):
super(Client, self).__init__(channel=channel, **kwargs)

Expand Down Expand Up @@ -233,24 +239,29 @@ def __on_write(self, sock):
elif self._poller.isWriting(self._sock):
self._poller.removeWriter(self._sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPClient(Client):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_protocol = IPPROTO_TCP
socket_options = [
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def init(self, connect_timeout=5, *args, **kwargs):
self.connect_timeout = connect_timeout

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM, IPPROTO_TCP)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)

return sock

@handler("connect") # noqa
def connect(self, host, port, secure=False, **kwargs):
# XXX: C901: This has a high McCacbe complexity score of 10.
Expand Down Expand Up @@ -322,16 +333,9 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXClient(Client):

def _create_socket(self):
from socket import AF_UNIX

sock = socket(AF_UNIX, SOCK_STREAM)
if self._bind is not None:
sock.bind(self._bind)

sock.setblocking(False)

return sock
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = []

@handler("ready")
def ready(self, component):
Expand Down Expand Up @@ -387,11 +391,13 @@ def on_error(sock, err):
class Server(BaseComponent):

channel = "server"
socket_protocol = IPPROTO_IP

def __init__(self, bind, secure=False, backlog=BACKLOG,
bufsize=BUFSIZE, channel=channel, **kwargs):
super(Server, self).__init__(channel=channel)

self.socket_options = self.socket_options[:] + kwargs.get('socket_options', [])
self._bind = self.parse_bind_parameter(bind)

self._backlog = backlog
Expand Down Expand Up @@ -647,18 +653,28 @@ def _on_write(self, sock):
elif self._poller.isWriting(sock):
self._poller.removeWriter(sock)

def _create_socket(self):
sock = socket(self.socket_family, self.socket_type, self.socket_protocol)

for option in self.socket_options:
sock.setsockopt(*option)
sock.setblocking(False)
if self._bind is not None:
sock.bind(self._bind)
return sock


class TCPServer(Server):

socket_family = AF_INET
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
(IPPROTO_TCP, TCP_NODELAY, 1),
]

def _create_socket(self):
sock = socket(self.socket_family, SOCK_STREAM)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setsockopt(IPPROTO_TCP, TCP_NODELAY, 1)
sock.setblocking(False)
sock.bind(self._bind)
sock = super(TCPServer, self)._create_socket()
sock.listen(self._backlog)

return sock
Expand Down Expand Up @@ -706,17 +722,17 @@ def parse_bind_parameter(self, bind_parameter):

class UNIXServer(Server):

def _create_socket(self):
from socket import AF_UNIX
socket_family = AF_UNIX
socket_type = SOCK_STREAM
socket_options = [
(SOL_SOCKET, SO_REUSEADDR, 1),
]

def _create_socket(self):
if os.path.exists(self._bind):
os.unlink(self._bind)

sock = socket(AF_UNIX, SOCK_STREAM)
sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
sock.setblocking(False)
sock = super(UNIXServer, self)._create_socket()
sock.listen(self._backlog)

return sock
Expand All @@ -725,18 +741,11 @@ def _create_socket(self):
class UDPServer(Server):

socket_family = AF_INET

def _create_socket(self):
sock = socket(self.socket_family, SOCK_DGRAM)

sock.bind(self._bind)

sock.setsockopt(SOL_SOCKET, SO_BROADCAST, 1)
sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)

sock.setblocking(False)

return sock
socket_type = SOCK_DGRAM
socket_options = [
(SOL_SOCKET, SO_BROADCAST, 1),
(SOL_SOCKET, SO_REUSEADDR, 1)
]

def _close(self, sock):
self._poller.discard(sock)
Expand Down
13 changes: 13 additions & 0 deletions tests/net/test_socket_options.py
@@ -0,0 +1,13 @@
import pytest

from circuits.net.sockets import TCPServer

try:
from socket import SOL_SOCKET, SO_REUSEPORT
except ImportError:
pytestmark = pytest.mark.skip(reason='Missing SO_REUSEPORT')


def test_socket_options_server():
s = TCPServer(('0.0.0.0', 8090), socket_options=[(SOL_SOCKET, SO_REUSEPORT, 1)])
assert s._sock.getsockopt(SOL_SOCKET, SO_REUSEPORT) == 1

0 comments on commit b478492

Please sign in to comment.