Navigation Menu

Skip to content

Commit

Permalink
Moved code from Mojo::Transaction::WebSocket into Mojo::WebSocket (ne…
Browse files Browse the repository at this point in the history
…w module)
  • Loading branch information
Jan Henning Thorsen committed Jan 10, 2016
1 parent 0e3af25 commit 9fd0137
Show file tree
Hide file tree
Showing 8 changed files with 326 additions and 277 deletions.
4 changes: 2 additions & 2 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -6,6 +6,7 @@ use Mojo::IOLoop;
use Mojo::Transaction::WebSocket;
use Mojo::URL;
use Mojo::Util 'term_escape';
use Mojo::WebSocket 'server_handshake';
use Scalar::Util 'weaken';

use constant DEBUG => $ENV{MOJO_DAEMON_DEBUG} || 0;
Expand Down Expand Up @@ -85,8 +86,7 @@ sub _build_tx {
# WebSocket
if ($tx->req->is_handshake) {
my $ws = Mojo::Transaction::WebSocket->new(handshake => $tx);
$ws->server_handshake;
$self->emit(request => $ws);
$self->emit(request => server_handshake $ws);
$tx->next($ws->handshake(undef));
}

Expand Down
214 changes: 15 additions & 199 deletions lib/Mojo/Transaction/WebSocket.pm
Expand Up @@ -6,17 +6,11 @@ use Config;
use List::Util 'first';
use Mojo::JSON qw(encode_json j);
use Mojo::Transaction::HTTP;
use Mojo::Util qw(b64_encode decode dumper encode sha1_bytes trim xor_encode);
use Mojo::Util qw(decode deprecated encode trim);
use Mojo::WebSocket;

use constant DEBUG => $ENV{MOJO_WEBSOCKET_DEBUG} || 0;

# Perl with support for quads
use constant MODERN =>
(($Config{use64bitint} // '') eq 'define' || $Config{longsize} >= 8);

# Unique value from RFC 6455
use constant GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

# Opcodes
use constant {
CONTINUATION => 0x0,
Expand All @@ -32,44 +26,8 @@ has handshake => sub { Mojo::Transaction::HTTP->new };
has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };

sub build_frame {
my ($self, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_;
warn "-- Building frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Head
my $head = $op + ($fin ? 128 : 0);
$head |= 0b01000000 if $rsv1;
$head |= 0b00100000 if $rsv2;
$head |= 0b00010000 if $rsv3;
my $frame = pack 'C', $head;

# Small payload
my $len = length $payload;
my $masked = $self->masked;
if ($len < 126) {
warn "-- Small payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? ($len | 128) : $len;
}

# Extended payload (16-bit)
elsif ($len < 65536) {
warn "-- Extended 16-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'Cn', $masked ? (126 | 128) : 126, $len;
}

# Extended payload (64-bit with 32-bit fallback)
else {
warn "-- Extended 64-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? (127 | 128) : 127;
$frame .= MODERN ? pack('Q>', $len) : pack('NN', 0, $len & 0xffffffff);
}

# Mask payload
if ($masked) {
my $mask = pack 'N', int(rand 9 x 7);
$payload = $mask . xor_encode($payload, $mask x 128);
}

return $frame . $payload;
deprecated 'Mojo::Transaction::WebSocket::build_frame is DEPRECATED';
Mojo::WebSocket::build_frame(shift->masked, @_);
}

sub build_message {
Expand All @@ -86,7 +44,8 @@ sub build_message {
else { $frame = [1, 0, 0, 0, BINARY, $frame->{binary}] }

# "permessage-deflate" extension
return $self->build_frame(@$frame) unless $self->compressed;
return Mojo::WebSocket::build_frame($self->masked, @$frame)
unless $self->compressed;
my $deflate = $self->{deflate} ||= Compress::Raw::Zlib::Deflate->new(
AppendOutput => 1,
MemLevel => 8,
Expand All @@ -95,32 +54,7 @@ sub build_message {
$deflate->deflate($frame->[5], my $out);
$deflate->flush($out, Z_SYNC_FLUSH);
@$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
return $self->build_frame(@$frame);
}

sub client_challenge {
my $self = shift;

# "permessage-deflate" extension
my $headers = $self->res->headers;
$self->compressed(1)
if ($headers->sec_websocket_extensions // '') =~ /permessage-deflate/;

return _challenge($self->req->headers->sec_websocket_key) eq
$headers->sec_websocket_accept && ++$self->{open};
}

sub client_handshake {
my $self = shift;

my $headers = $self->req->headers;
$headers->upgrade('websocket') unless $headers->upgrade;
$headers->connection('Upgrade') unless $headers->connection;
$headers->sec_websocket_version(13) unless $headers->sec_websocket_version;

# Generate 16 byte WebSocket challenge
my $challenge = b64_encode sprintf('%16u', int(rand 9 x 16)), '';
$headers->sec_websocket_key($challenge) unless $headers->sec_websocket_key;
return Mojo::WebSocket::build_frame($self->masked, @$frame);
}

sub client_read { shift->server_read(@_) }
Expand Down Expand Up @@ -155,59 +89,8 @@ sub new {
}

sub parse_frame {
my ($self, $buffer) = @_;

# Head
return undef unless length $$buffer >= 2;
my ($first, $second) = unpack 'C*', substr($$buffer, 0, 2);

# FIN
my $fin = ($first & 0b10000000) == 0b10000000 ? 1 : 0;

# RSV1-3
my $rsv1 = ($first & 0b01000000) == 0b01000000 ? 1 : 0;
my $rsv2 = ($first & 0b00100000) == 0b00100000 ? 1 : 0;
my $rsv3 = ($first & 0b00010000) == 0b00010000 ? 1 : 0;

# Opcode
my $op = $first & 0b00001111;
warn "-- Parsing frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Small payload
my ($hlen, $len) = (2, $second & 0b01111111);
if ($len < 126) { warn "-- Small payload ($len)\n" if DEBUG }

# Extended payload (16-bit)
elsif ($len == 126) {
return undef unless length $$buffer > 4;
$hlen = 4;
$len = unpack 'n', substr($$buffer, 2, 2);
warn "-- Extended 16-bit payload ($len)\n" if DEBUG;
}

# Extended payload (64-bit with 32-bit fallback)
elsif ($len == 127) {
return undef unless length $$buffer > 10;
$hlen = 10;
my $ext = substr $$buffer, 2, 8;
$len = MODERN ? unpack('Q>', $ext) : unpack('N', substr($ext, 4, 4));
warn "-- Extended 64-bit payload ($len)\n" if DEBUG;
}

# Check message size
return 1 if $len > $self->max_websocket_size;

# Check if whole packet has arrived
$len += 4 if my $masked = $second & 0b10000000;
return undef if length $$buffer < ($hlen + $len);
substr $$buffer, 0, $hlen, '';

# Payload
my $payload = $len ? substr($$buffer, 0, $len, '') : '';
$payload = xor_encode($payload, substr($payload, 0, 4, '') x 128) if $masked;
warn dumper $payload if DEBUG;

return [$fin, $rsv1, $rsv2, $rsv3, $op, $payload];
deprecated 'Mojo::Transaction::WebSocket::parse_frame is DEPRECATED';
Mojo::WebSocket::parse_frame($_[1], $_[0]->max_websocket_size);
}

sub protocol { shift->res->headers->sec_websocket_protocol }
Expand All @@ -222,8 +105,10 @@ sub resume { $_[0]->handshake->resume and return $_[0] }
sub send {
my ($self, $msg, $cb) = @_;
$self->once(drain => $cb) if $cb;
if (ref $msg eq 'ARRAY') { $self->{write} .= $self->build_frame(@$msg) }
else { $self->{write} .= $self->build_message($msg) }
if (ref $msg eq 'ARRAY') {
$self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
}
else { $self->{write} .= $self->build_message($msg) }
return $self->SUPER::resume;
}

Expand All @@ -233,21 +118,14 @@ sub server_close {
return $self->emit(finish => $self->{close} ? (@{$self->{close}}) : 1006);
}

sub server_handshake {
my $self = shift;
my $res_headers = $self->res->headers;
$res_headers->upgrade('websocket')->connection('Upgrade');
$res_headers->sec_websocket_accept(
_challenge($self->req->headers->sec_websocket_key));
}

sub server_open { shift->{open}++ }

sub server_read {
my ($self, $chunk) = @_;

$self->{read} .= $chunk // '';
while (my $frame = $self->parse_frame(\$self->{read})) {
my $max = $self->max_websocket_size;
while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
$self->finish(1009) and last unless ref $frame;
$self->emit(frame => $frame);
}
Expand Down Expand Up @@ -287,8 +165,6 @@ sub with_protocols {
return $proto;
}

sub _challenge { b64_encode(sha1_bytes(($_[0] || '') . GUID), '') }

sub _message {
my ($self, $frame) = @_;

Expand Down Expand Up @@ -513,30 +389,6 @@ C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144> (256KB).
L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction>
and implements the following new ones.
=head2 build_frame
my $bytes = $ws->build_frame($fin, $rsv1, $rsv2, $rsv3, $op, $payload);
Build WebSocket frame.
# Binary frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 2, 'Hello World!');
# Text frame with payload but without FIN bit
say $ws->build_frame(0, 0, 0, 0, 1, 'Hello ');
# Continuation frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 0, 'World!');
# Close frame with FIN bit and without payload
say $ws->build_frame(1, 0, 0, 0, 8, '');
# Ping frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 9, 'Test 123');
# Pong frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 10, 'Test 123');
=head2 build_message
my $bytes = $ws->build_message({binary => $bytes});
Expand All @@ -546,20 +398,6 @@ Build WebSocket frame.
Build WebSocket message.
=head2 client_challenge
my $bool = $ws->client_challenge;
Check WebSocket handshake challenge client-side, used to implement user agents
such as L<Mojo::UserAgent>.
=head2 client_handshake
$ws->client_handshake;
Perform WebSocket handshake client-side, used to implement user agents such as
L<Mojo::UserAgent>.
=head2 client_read
$ws->client_read($data);
Expand Down Expand Up @@ -627,21 +465,6 @@ Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to
L</"frame"> event with default message parser, which also handles C<PING> and
C<CLOSE> frames automatically.
=head2 parse_frame
my $frame = $ws->parse_frame(\$bytes);
Parse WebSocket frame.
# Parse single frame and remove it from buffer
my $frame = $ws->parse_frame(\$buffer);
say "FIN: $frame->[0]";
say "RSV1: $frame->[1]";
say "RSV2: $frame->[2]";
say "RSV3: $frame->[3]";
say "Opcode: $frame->[4]";
say "Payload: $frame->[5]";
=head2 protocol
my $proto = $ws->protocol;
Expand Down Expand Up @@ -700,13 +523,6 @@ will be invoked once all data has been written.
Transaction closed server-side, used to implement web servers such as
L<Mojo::Server::Daemon>.
=head2 server_handshake
$ws->server_handshake;
Perform WebSocket handshake server-side, used to implement web servers such as
L<Mojo::Server::Daemon>.
=head2 server_open
$ws->server_open;
Expand Down
7 changes: 3 additions & 4 deletions lib/Mojo/UserAgent/Transactor.pm
Expand Up @@ -12,6 +12,7 @@ use Mojo::Transaction::HTTP;
use Mojo::Transaction::WebSocket;
use Mojo::URL;
use Mojo::Util qw(encode url_escape);
use Mojo::WebSocket qw(challenge client_handshake);

has generators => sub { {form => \&_form, json => \&_json} };
has name => 'Mojolicious (Perl)';
Expand Down Expand Up @@ -128,7 +129,7 @@ sub upgrade {
my $code = $tx->res->code // 0;
return undef unless $tx->req->is_handshake && $code == 101;
my $ws = Mojo::Transaction::WebSocket->new(handshake => $tx, masked => 1);
return $ws->client_challenge ? $ws : undef;
return challenge($ws) ? $ws : undef;
}

sub websocket {
Expand All @@ -144,9 +145,7 @@ sub websocket {
$url->scheme($proto eq 'wss' ? 'https' : 'http') if $proto;

# Handshake
Mojo::Transaction::WebSocket->new(handshake => $tx)->client_handshake;

return $tx;
return client_handshake $tx;
}

sub _form {
Expand Down

0 comments on commit 9fd0137

Please sign in to comment.