Skip to content

Commit

Permalink
add parse_message method to Mojo::Transaction::WebSocket
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Jan 27, 2016
1 parent 2936ea6 commit e1720a6
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 61 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,5 +1,6 @@

6.43 2016-01-27
- Added parse_message method to Mojo::Transaction::WebSocket.
- Improved a few examples to avoid timing attacks.

6.42 2016-01-24
Expand Down
112 changes: 51 additions & 61 deletions lib/Mojo/Transaction/WebSocket.pm
Expand Up @@ -68,18 +68,58 @@ sub kept_alive { shift->handshake->kept_alive }
sub local_address { shift->handshake->local_address }
sub local_port { shift->handshake->local_port }

sub new {
my $self = shift->SUPER::new(@_);
$self->on(frame => sub { shift->_message(@_) });
return $self;
}

# DEPRECATED in Clinking Beer Mugs!
sub parse_frame {
deprecated 'Mojo::Transaction::WebSocket::parse_frame is DEPRECATED';
Mojo::WebSocket::parse_frame($_[1], $_[0]->max_websocket_size);
}

sub parse_message {
my ($self, $frame) = @_;

$self->emit(frame => $frame);

# Ping/Pong
my $op = $frame->[4];
return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
return if $op == WS_PONG;

# Close
if ($op == WS_CLOSE) {
return $self->finish unless length $frame->[5] >= 2;
return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')),
decode('UTF-8', $frame->[5]));
}

# Append chunk and check message size
$self->{op} = $op unless exists $self->{op};
$self->{message} .= $frame->[5];
my $max = $self->max_websocket_size;
return $self->finish(1009) if length $self->{message} > $max;

# No FIN bit (Continuation)
return unless $frame->[0];

# "permessage-deflate" extension (handshake and RSV1)
my $msg = delete $self->{message};
if ($self->compressed && $frame->[1]) {
my $inflate = $self->{inflate} ||= Compress::Raw::Zlib::Inflate->new(
Bufsize => $max,
LimitOutput => 1,
WindowBits => -15
);
$inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
return $self->finish(1009) if $msg ne '';
$msg = $out;
}

$self->emit(json => j($msg)) if $self->has_subscribers('json');
$op = delete $self->{op};
$self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
$self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg)
if $self->has_subscribers('message');
}

sub protocol { shift->res->headers->sec_websocket_protocol }

sub remote_address { shift->handshake->remote_address }
Expand Down Expand Up @@ -109,7 +149,7 @@ sub server_read {
my $max = $self->max_websocket_size;
while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
$self->finish(1009) and last unless ref $frame;
$self->emit(frame => $frame);
$self->parse_message($frame);
}

$self->emit('resume');
Expand Down Expand Up @@ -143,52 +183,6 @@ sub with_protocols {
return $proto;
}

sub _message {
my ($self, $frame) = @_;

# Assume continuation
my $op = $frame->[4] || WS_CONTINUATION;

# Ping/Pong
return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
return if $op == WS_PONG;

# Close
if ($op == WS_CLOSE) {
return $self->finish unless length $frame->[5] >= 2;
return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')),
decode('UTF-8', $frame->[5]));
}

# Append chunk and check message size
$self->{op} = $op unless exists $self->{op};
$self->{message} .= $frame->[5];
my $max = $self->max_websocket_size;
return $self->finish(1009) if length $self->{message} > $max;

# No FIN bit (Continuation)
return unless $frame->[0];

# "permessage-deflate" extension (handshake and RSV1)
my $msg = delete $self->{message};
if ($self->compressed && $frame->[1]) {
my $inflate = $self->{inflate} ||= Compress::Raw::Zlib::Inflate->new(
Bufsize => $max,
LimitOutput => 1,
WindowBits => -15
);
$inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
return $self->finish(1009) if $msg ne '';
$msg = $out;
}

$self->emit(json => j($msg)) if $self->has_subscribers('json');
$op = delete $self->{op};
$self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
$self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg)
if $self->has_subscribers('message');
}

1;

=encoding utf8
Expand Down Expand Up @@ -270,7 +264,7 @@ Emitted when the WebSocket connection has been closed.
Emitted when a WebSocket frame has been received.
$ws->unsubscribe('frame')->on(frame => sub {
$ws->on(frame => sub {
my ($ws, $frame) = @_;
say "FIN: $frame->[0]";
say "RSV1: $frame->[1]";
Expand Down Expand Up @@ -441,15 +435,11 @@ Local interface address.
Local interface port.
=head2 new
=head2 parse_message
my $ws = Mojo::Transaction::WebSocket->new;
my $ws = Mojo::Transaction::WebSocket->new(compressed => 1);
my $ws = Mojo::Transaction::WebSocket->new({compressed => 1});
$ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to
L</"frame"> event with default message parser, which also handles C<Ping> and
C<Close> frames automatically.
Parse WebSocket message.
=head2 protocol
Expand Down
11 changes: 11 additions & 0 deletions t/mojo/websocket_frames.t
Expand Up @@ -233,6 +233,17 @@ ok !ref $frame, 'not a reference';
is parse_frame(\($dummy = "\x82\x05\x77\x6f\x72\x6b"), 262144), undef,
'incomplete frame';

# Fragmented message
my $fragmented = Mojo::Transaction::WebSocket->new;
my $text;
$fragmented->on(text => sub { $text = pop });
$fragmented->parse_message([0, 0, 0, 0, WS_TEXT, 'wo']);
ok !$text, 'text event has not been emitted yet';
$fragmented->parse_message([0, 0, 0, 0, WS_CONTINUATION, 'r']);
ok !$text, 'text event has not been emitted yet';
$fragmented->parse_message([1, 0, 0, 0, WS_CONTINUATION, 'ks!']);
is $text, 'works!', 'right payload';

# Compressed binary message
my $compressed = Mojo::Transaction::WebSocket->new({compressed => 1});
$frame = $compressed->build_message({binary => 'just works'});
Expand Down

0 comments on commit e1720a6

Please sign in to comment.