Skip to content

Commit

Permalink
improved Mojo::Transaction::WebSocket to allow custom message parsers
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Apr 9, 2012
1 parent f595c18 commit 304118e
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 42 deletions.
3 changes: 2 additions & 1 deletion Changes
@@ -1,6 +1,7 @@
This file documents the revision history for Perl extension Mojolicious.

2.77 2012-04-08
2.77 2012-04-09
- Improved Mojo::Transaction::WebSocket to allow custom message parsers.
- Improved help command to be less strict.
- Improved documentation. (tempire, sri)
- Improved tests.
Expand Down
1 change: 0 additions & 1 deletion examples/connect-proxy.pl
Expand Up @@ -18,7 +18,6 @@
if (my $server = $c->{$client}->{connection}) {
return Mojo::IOLoop->stream($server)->write($chunk);
}
$c->{$client}->{client} //= '';
$c->{$client}->{client} .= $chunk;
if ($c->{$client}->{client} =~ /\x0d?\x0a\x0d?\x0a$/) {
my $buffer = $c->{$client}->{client};
Expand Down
5 changes: 1 addition & 4 deletions lib/Mojo/Content.pm
Expand Up @@ -207,10 +207,7 @@ sub write {
$self->{dynamic} = 1;

# Add chunk
if (defined $chunk) {
$self->{body_buffer} //= '';
$self->{body_buffer} .= $chunk;
}
if (defined $chunk) { $self->{body_buffer} .= $chunk }

# Delay
else { $self->{delay} = 1 }
Expand Down
79 changes: 45 additions & 34 deletions lib/Mojo/Transaction/WebSocket.pm
Expand Up @@ -26,6 +26,12 @@ has handshake => sub { Mojo::Transaction::HTTP->new };
has 'masked';
has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };

sub new {
my $self = shift->SUPER::new(@_);
$self->on(frame => sub { shift->_message(@_) });
return $self;
}

sub build_frame {
my ($self, $fin, $rsv1, $rsv2, $rsv3, $op, $payload) = @_;
warn "BUILDING FRAME\n" if DEBUG;
Expand Down Expand Up @@ -232,7 +238,6 @@ sub send {

# Prepare frame
$self->once(drain => $cb) if $cb;
$self->{write} //= '';
$self->{write} .= $self->build_frame(@$frame);
$self->{state} = 'write';

Expand Down Expand Up @@ -261,36 +266,9 @@ sub server_read {
my ($self, $chunk) = @_;

# Parse frames
$self->{read} //= '';
$self->{read} .= $chunk if defined $chunk;
$self->{message} //= '';
while (my $frame = $self->parse_frame(\$self->{read})) {
$self->emit(frame => $frame);
my $op = $frame->[4] || CONTINUATION;

# Ping/Pong
$self->send([1, 0, 0, 0, PONG, $frame->[5]]) and next if $op == PING;
next if $op == PONG;

# Close
$self->finish and next if $op == CLOSE;

# Append chunk and check message size
next unless $self->has_subscribers('message');
$self->{op} = $op unless exists $self->{op};
$self->{message} .= $frame->[5];
$self->finish and last
if length $self->{message} > $self->max_websocket_size;

# No FIN bit (Continuation)
next unless $frame->[0];

# Message
my $message = $self->{message};
$self->{message} = '';
$message = decode 'UTF-8', $message
if $message && delete $self->{op} == TEXT;
$self->emit(message => $message);
}

# Resume
Expand All @@ -301,21 +279,46 @@ sub server_write {
my $self = shift;

# Drain
$self->{write} //= '';
unless (length $self->{write}) {
unless (length($self->{write} // '')) {
$self->{state} = $self->{finished} ? 'finished' : 'read';
$self->emit('drain');
}

# Empty buffer
my $write = $self->{write};
$self->{write} = '';

return $write;
return delete $self->{write} // '';
}

sub _challenge { b64_encode(sha1_bytes((pop() || '') . GUID), '') }

sub _message {
my ($self, $frame) = @_;

# Assume continuation
my $op = $frame->[4] || CONTINUATION;

# Ping/Pong
return $self->send([1, 0, 0, 0, PONG, $frame->[5]]) if $op == PING;
return if $op == PONG;

# Close
return $self->finish if $op == CLOSE;

# Append chunk and check message size
$self->{op} = $op unless exists $self->{op};
$self->{message} .= $frame->[5];
$self->finish and last
if length $self->{message} > $self->max_websocket_size;

# No FIN bit (Continuation)
return unless $frame->[0];

# Message
my $message = delete $self->{message};
$message = decode 'UTF-8', $message
if $message && delete $self->{op} == TEXT;
$self->emit(message => $message);
}

sub _xor_mask {
my ($input, $mask) = @_;

Expand Down Expand Up @@ -372,6 +375,7 @@ Emitted once all data has been sent.
Emitted when a WebSocket frame has been received.
$ws->unsubscribe('frame');
$ws->on(frame => sub {
my ($ws, $frame) = @_;
say "Fin: $frame->[0]";
Expand Down Expand Up @@ -429,6 +433,13 @@ C<MOJO_MAX_WEBSOCKET_SIZE> environment variable or C<262144>.
L<Mojo::Transaction::WebSocket> inherits all methods from
L<Mojo::Transaction> and implements the following new ones.
=head2 C<new>
my $multi = Mojo::Content::MultiPart->new;
Construct a new L<Mojo::Transaction::WebSocket> object and subscribe to
C<frame> event with default message parser.
=head2 C<build_frame>
my $bytes = $ws->build_frame($fin, $rsv1, $rsv2, $rsv3, $op, $payload);
Expand Down
1 change: 0 additions & 1 deletion t/mojo/websocket_proxy.t
Expand Up @@ -71,7 +71,6 @@ Mojo::IOLoop->server(
if (my $server = $c->{$client}->{connection}) {
return Mojo::IOLoop->stream($server)->write($chunk);
}
$c->{$client}->{client} //= '';
$c->{$client}->{client} .= $chunk;
if ($c->{$client}->{client} =~ /\x0d?\x0a\x0d?\x0a$/) {
my $buffer = $c->{$client}->{client};
Expand Down
1 change: 0 additions & 1 deletion t/mojo/websocket_proxy_tls.t
Expand Up @@ -89,7 +89,6 @@ Mojo::IOLoop->server(
if (my $server = $c->{$client}->{connection}) {
return Mojo::IOLoop->stream($server)->write($chunk);
}
$c->{$client}->{client} //= '';
$c->{$client}->{client} .= $chunk;
if ($c->{$client}->{client} =~ /\x0d?\x0a\x0d?\x0a$/) {
my $buffer = $c->{$client}->{client};
Expand Down

0 comments on commit 304118e

Please sign in to comment.