Skip to content

Commit

Permalink
Add Mojo::WebSocket::parse_frame() and ::build_frame()
Browse files Browse the repository at this point in the history
  • Loading branch information
Jan Henning Thorsen committed Jan 9, 2016
1 parent 9e14e70 commit 3a4cc39
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 196 deletions.
4 changes: 3 additions & 1 deletion lib/Mojo/Channel/WebSocket.pm
@@ -1,12 +1,14 @@
package Mojo::Channel::WebSocket;
use Mojo::Base 'Mojo::Channel';

use Mojo::WebSocket 'parse_frame';

sub read {
my ($self, $chunk) = @_;

my $tx = $self->{tx};
$self->{read} .= $chunk // '';
while (my $frame = $tx->parse_frame(\$self->{read})) {
while (my $frame = parse_frame \$self->{read}, $tx->max_websocket_size) {
$tx->finish(1009) and last unless ref $frame;
$tx->emit(frame => $frame);
}
Expand Down
149 changes: 8 additions & 141 deletions lib/Mojo/Transaction/WebSocket.pm
Expand Up @@ -6,7 +6,8 @@ use Config;
use List::Util 'first';
use Mojo::JSON qw(encode_json j);
use Mojo::Transaction::HTTP;
use Mojo::Util qw(decode dumper encode trim xor_encode);
use Mojo::Util qw(decode encode trim);
use Mojo::WebSocket qw(build_frame);

use constant DEBUG => $ENV{MOJO_WEBSOCKET_DEBUG} || 0;

Expand All @@ -28,47 +29,6 @@ has [qw(compressed masked)];
has handshake => sub { Mojo::Transaction::HTTP->new };
has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };

sub build_frame {
my ($self, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_;
warn "-- Building frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Head
my $head = $op + ($fin ? 128 : 0);
$head |= 0b01000000 if $rsv1;
$head |= 0b00100000 if $rsv2;
$head |= 0b00010000 if $rsv3;
my $frame = pack 'C', $head;

# Small payload
my $len = length $payload;
my $masked = $self->masked;
if ($len < 126) {
warn "-- Small payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? ($len | 128) : $len;
}

# Extended payload (16-bit)
elsif ($len < 65536) {
warn "-- Extended 16-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'Cn', $masked ? (126 | 128) : 126, $len;
}

# Extended payload (64-bit with 32-bit fallback)
else {
warn "-- Extended 64-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? (127 | 128) : 127;
$frame .= MODERN ? pack('Q>', $len) : pack('NN', 0, $len & 0xffffffff);
}

# Mask payload
if ($masked) {
my $mask = pack 'N', int(rand 9 x 7);
$payload = $mask . xor_encode($payload, $mask x 128);
}

return $frame . $payload;
}

sub build_message {
my ($self, $frame) = @_;

Expand All @@ -83,7 +43,7 @@ sub build_message {
else { $frame = [1, 0, 0, 0, BINARY, $frame->{binary}] }

# "permessage-deflate" extension
return $self->build_frame(@$frame) unless $self->compressed;
return build_frame $self->masked, @$frame unless $self->compressed;
my $deflate = $self->{deflate} ||= Compress::Raw::Zlib::Deflate->new(
AppendOutput => 1,
MemLevel => 8,
Expand All @@ -92,7 +52,7 @@ sub build_message {
$deflate->deflate($frame->[5], my $out);
$deflate->flush($out, Z_SYNC_FLUSH);
@$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
return $self->build_frame(@$frame);
return build_frame $self->masked, @$frame;
}

sub connection { shift->handshake->connection }
Expand Down Expand Up @@ -123,62 +83,6 @@ sub new {
return $self;
}

sub parse_frame {
my ($self, $buffer) = @_;

# Head
return undef unless length $$buffer >= 2;
my ($first, $second) = unpack 'C*', substr($$buffer, 0, 2);

# FIN
my $fin = ($first & 0b10000000) == 0b10000000 ? 1 : 0;

# RSV1-3
my $rsv1 = ($first & 0b01000000) == 0b01000000 ? 1 : 0;
my $rsv2 = ($first & 0b00100000) == 0b00100000 ? 1 : 0;
my $rsv3 = ($first & 0b00010000) == 0b00010000 ? 1 : 0;

# Opcode
my $op = $first & 0b00001111;
warn "-- Parsing frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Small payload
my ($hlen, $len) = (2, $second & 0b01111111);
if ($len < 126) { warn "-- Small payload ($len)\n" if DEBUG }

# Extended payload (16-bit)
elsif ($len == 126) {
return undef unless length $$buffer > 4;
$hlen = 4;
$len = unpack 'n', substr($$buffer, 2, 2);
warn "-- Extended 16-bit payload ($len)\n" if DEBUG;
}

# Extended payload (64-bit with 32-bit fallback)
elsif ($len == 127) {
return undef unless length $$buffer > 10;
$hlen = 10;
my $ext = substr $$buffer, 2, 8;
$len = MODERN ? unpack('Q>', $ext) : unpack('N', substr($ext, 4, 4));
warn "-- Extended 64-bit payload ($len)\n" if DEBUG;
}

# Check message size
return 1 if $len > $self->max_websocket_size;

# Check if whole packet has arrived
$len += 4 if my $masked = $second & 0b10000000;
return undef if length $$buffer < ($hlen + $len);
substr $$buffer, 0, $hlen, '';

# Payload
my $payload = $len ? substr($$buffer, 0, $len, '') : '';
$payload = xor_encode($payload, substr($payload, 0, 4, '') x 128) if $masked;
warn dumper $payload if DEBUG;

return [$fin, $rsv1, $rsv2, $rsv3, $op, $payload];
}

sub protocol { shift->res->headers->sec_websocket_protocol }

sub remote_address { shift->handshake->remote_address }
Expand All @@ -192,8 +96,10 @@ sub send {
my ($self, $msg, $cb) = @_;

$self->once(drain => $cb) if $cb;
if (ref $msg eq 'ARRAY') { $self->{write} .= $self->build_frame(@$msg) }
else { $self->{write} .= $self->build_message($msg) }
if (ref $msg eq 'ARRAY') {
$self->{write} .= build_frame $self->masked, @$msg;
}
else { $self->{write} .= $self->build_message($msg) }
$self->{state} = 'write';

return $self->emit('resume');
Expand Down Expand Up @@ -452,30 +358,6 @@ C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144> (256KB).
L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction>
and implements the following new ones.
=head2 build_frame
my $bytes = $ws->build_frame($fin, $rsv1, $rsv2, $rsv3, $op, $payload);
Build WebSocket frame.
# Binary frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 2, 'Hello World!');
# Text frame with payload but without FIN bit
say $ws->build_frame(0, 0, 0, 0, 1, 'Hello ');
# Continuation frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 0, 'World!');
# Close frame with FIN bit and without payload
say $ws->build_frame(1, 0, 0, 0, 8, '');
# Ping frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 9, 'Test 123');
# Pong frame with FIN bit and payload
say $ws->build_frame(1, 0, 0, 0, 10, 'Test 123');
=head2 build_message
my $bytes = $ws->build_message({binary => $bytes});
Expand Down Expand Up @@ -539,21 +421,6 @@ Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to
L</"frame"> event with default message parser, which also handles C<PING> and
C<CLOSE> frames automatically.
=head2 parse_frame
my $frame = $ws->parse_frame(\$bytes);
Parse WebSocket frame.
# Parse single frame and remove it from buffer
my $frame = $ws->parse_frame(\$buffer);
say "FIN: $frame->[0]";
say "RSV1: $frame->[1]";
say "RSV2: $frame->[2]";
say "RSV3: $frame->[3]";
say "Opcode: $frame->[4]";
say "Payload: $frame->[5]";
=head2 protocol
my $proto = $ws->protocol;
Expand Down
109 changes: 107 additions & 2 deletions lib/Mojo/WebSocket.pm
Expand Up @@ -2,13 +2,62 @@ package Mojo::WebSocket;
use Mojo::Base -strict;

use Exporter 'import';
use Mojo::Util qw(b64_encode sha1_bytes);
use Config;
use Mojo::Util qw(b64_encode sha1_bytes xor_encode);

our @EXPORT_OK = qw(challenge client_handshake server_handshake);
our @EXPORT_OK
= qw(build_frame challenge client_handshake parse_frame server_handshake);

# Unique value from RFC 6455
use constant GUID => '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

use constant DEBUG => $ENV{MOJO_WEBSOCKET_DEBUG} || 0;

# Perl with support for quads
use constant MODERN =>
(($Config{use64bitint} // '') eq 'define' || $Config{longsize} >= 8);

sub build_frame {
my ($masked, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_;
warn "-- Building frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Head
my $head = $op + ($fin ? 128 : 0);
$head |= 0b01000000 if $rsv1;
$head |= 0b00100000 if $rsv2;
$head |= 0b00010000 if $rsv3;
my $frame = pack 'C', $head;

# Small payload
my $len = length $payload;
if ($len < 126) {
warn "-- Small payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? ($len | 128) : $len;
}

# Extended payload (16-bit)
elsif ($len < 65536) {
warn "-- Extended 16-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'Cn', $masked ? (126 | 128) : 126, $len;
}

# Extended payload (64-bit with 32-bit fallback)
else {
warn "-- Extended 64-bit payload ($len)\n@{[dumper $payload]}" if DEBUG;
$frame .= pack 'C', $masked ? (127 | 128) : 127;
$frame .= MODERN ? pack('Q>', $len) : pack('NN', 0, $len & 0xffffffff);
}

# Mask payload
if ($masked) {
my $mask = pack 'N', int(rand 9 x 7);
$payload = $mask . xor_encode($payload, $mask x 128);
}

return $frame . $payload;
}


sub challenge {
my $tx = shift;

Expand Down Expand Up @@ -36,6 +85,62 @@ sub client_handshake {
return $tx;
}

sub parse_frame {
my ($buffer, $max_websocket_size) = @_;

# Head
return undef unless length $$buffer >= 2;
my ($first, $second) = unpack 'C*', substr($$buffer, 0, 2);

# FIN
my $fin = ($first & 0b10000000) == 0b10000000 ? 1 : 0;

# RSV1-3
my $rsv1 = ($first & 0b01000000) == 0b01000000 ? 1 : 0;
my $rsv2 = ($first & 0b00100000) == 0b00100000 ? 1 : 0;
my $rsv3 = ($first & 0b00010000) == 0b00010000 ? 1 : 0;

# Opcode
my $op = $first & 0b00001111;
warn "-- Parsing frame ($fin, $rsv1, $rsv2, $rsv3, $op)\n" if DEBUG;

# Small payload
my ($hlen, $len) = (2, $second & 0b01111111);
if ($len < 126) { warn "-- Small payload ($len)\n" if DEBUG }

# Extended payload (16-bit)
elsif ($len == 126) {
return undef unless length $$buffer > 4;
$hlen = 4;
$len = unpack 'n', substr($$buffer, 2, 2);
warn "-- Extended 16-bit payload ($len)\n" if DEBUG;
}

# Extended payload (64-bit with 32-bit fallback)
elsif ($len == 127) {
return undef unless length $$buffer > 10;
$hlen = 10;
my $ext = substr $$buffer, 2, 8;
$len = MODERN ? unpack('Q>', $ext) : unpack('N', substr($ext, 4, 4));
warn "-- Extended 64-bit payload ($len)\n" if DEBUG;
}

# Check message size
return 1 if $len > $max_websocket_size;

# Check if whole packet has arrived
$len += 4 if my $masked = $second & 0b10000000;
return undef if length $$buffer < ($hlen + $len);
substr $$buffer, 0, $hlen, '';

# Payload
my $payload = $len ? substr($$buffer, 0, $len, '') : '';
$payload = xor_encode($payload, substr($payload, 0, 4, '') x 128) if $masked;
warn dumper $payload if DEBUG;

return [$fin, $rsv1, $rsv2, $rsv3, $op, $payload];
}

sub server_handshake {
my $tx = shift;

Expand Down

0 comments on commit 3a4cc39

Please sign in to comment.