Skip to content

Commit

Permalink
do not log timed out websocket connections
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Dec 10, 2011
1 parent 9e09641 commit 6ae1534
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
27 changes: 13 additions & 14 deletions lib/Mojo/Server/Daemon.pm
Expand Up @@ -103,13 +103,13 @@ sub _build_tx {
$tx->on(
upgrade => sub {
my ($tx, $ws) = @_;
$self->{connections}->{$id}->{websocket} = $ws->server_handshake;
$self->{connections}->{$id}->{ws} = $ws->server_handshake;
}
);
$tx->on(
request => sub {
my $tx = shift;
$self->emit(request => $self->{connections}->{$id}->{websocket} || $tx);
$self->emit(request => $self->{connections}->{$id}->{ws} || $tx);
$tx->on(resume => sub { $self->_write($id) });
}
);
Expand All @@ -131,7 +131,7 @@ sub _drop {

# Finish gracefully
my $c = $self->{connections}->{$id};
if (my $tx = $c->{websocket} || $c->{transaction}) { $tx->server_close }
if (my $tx = $c->{ws} || $c->{tx}) { $tx->server_close }

# Drop connection
delete $self->{connections}->{$id};
Expand All @@ -154,12 +154,12 @@ sub _finish {

# Finish transaction
my $c = $self->{connections}->{$id};
delete $c->{transaction};
delete $c->{tx};
$tx->server_close;

# WebSocket
my $s = 0;
if (my $ws = $c->{websocket}) {
if (my $ws = $c->{ws}) {

# Successful upgrade
if ($ws->res->code eq '101') {
Expand All @@ -174,7 +174,7 @@ sub _finish {

# Failed upgrade
else {
delete $c->{websocket};
delete $c->{ws};
$ws->server_close;
}
}
Expand All @@ -187,7 +187,7 @@ sub _finish {

# Leftovers
elsif (defined(my $leftovers = $tx->server_leftovers)) {
$tx = $c->{transaction} = $self->_build_tx($id, $c);
$tx = $c->{tx} = $self->_build_tx($id, $c);
$tx->server_read($leftovers);
}
}
Expand Down Expand Up @@ -234,9 +234,8 @@ sub _listen {
# Events
$stream->on(
timeout => sub {
my $c = $self->{connections}->{$id};
return unless $c->{transaction} || $c->{websocket};
$self->_error($id, 'Connection timeout.');
$self->_error($id, 'Connection timeout.')
if $self->{connections}->{$id}->{tx};
}
);
$stream->on(close => sub { $self->_close($id) });
Expand Down Expand Up @@ -272,8 +271,8 @@ sub _read {

# Make sure we have a transaction
my $c = $self->{connections}->{$id};
my $tx = $c->{transaction} || $c->{websocket};
$tx ||= $c->{transaction} = $self->_build_tx($id, $c);
my $tx = $c->{tx} || $c->{ws};
$tx ||= $c->{tx} = $self->_build_tx($id, $c);

# Parse chunk
$tx->server_read($chunk);
Expand All @@ -300,7 +299,7 @@ sub _write {

# Not writing
my $c = $self->{connections}->{$id};
return unless my $tx = $c->{transaction} || $c->{websocket};
return unless my $tx = $c->{tx} || $c->{ws};
return unless $tx->is_writing;

# Get chunk
Expand All @@ -322,7 +321,7 @@ sub _write {
}
else {
$self->_finish($id, $tx);
return unless $c->{transaction} || $c->{websocket};
return unless $c->{tx} || $c->{ws};
}
}
$stream->write('', $cb);
Expand Down
26 changes: 12 additions & 14 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -200,7 +200,7 @@ sub _connect {
$id ||= $self->_cache("$scheme:$host:$port");
if ($id && !ref $id) {
warn "KEEP ALIVE CONNECTION ($scheme:$host:$port)\n" if DEBUG;
$self->{connections}->{$id} = {cb => $cb, transaction => $tx};
$self->{connections}->{$id} = {cb => $cb, tx => $tx};
$tx->kept_alive(1);
$self->_connected($id);
return $id;
Expand Down Expand Up @@ -231,7 +231,7 @@ sub _connect {
$self->_connected($id);
}
);
$self->{connections}->{$id} = {cb => $cb, transaction => $tx};
$self->{connections}->{$id} = {cb => $cb, tx => $tx};

return $id;
}
Expand Down Expand Up @@ -295,7 +295,7 @@ sub _connected {
$loop->stream($id)->timeout($self->keep_alive_timeout);

# Store connection information in transaction
my $tx = $self->{connections}->{$id}->{transaction};
my $tx = $self->{connections}->{$id}->{tx};
$tx->connection($id);
my $handle = $loop->stream($id)->handle;
$tx->local_address($handle->sockhost);
Expand All @@ -313,7 +313,7 @@ sub _drop {
my ($self, $id, $close) = @_;

# Close connection
my $tx = (delete($self->{connections}->{$id}) || {})->{transaction};
my $tx = (delete($self->{connections}->{$id}) || {})->{tx};
unless (!$close && $tx && $tx->keep_alive && !$tx->error) {
$self->_cache($id);
return $self->_loop->drop($id);
Expand All @@ -327,9 +327,7 @@ sub _drop {

sub _error {
my ($self, $id, $error, $log) = @_;
if (my $tx = $self->{connections}->{$id}->{transaction}) {
$tx->res->error($error);
}
if (my $tx = $self->{connections}->{$id}->{tx}) { $tx->res->error($error) }
$self->log->error($error) if $log;
$self->_handle($id, $error);
}
Expand Down Expand Up @@ -368,7 +366,7 @@ sub _handle {

# Finish WebSocket
my $c = $self->{connections}->{$id};
my $old = $c->{transaction};
my $old = $c->{tx};
if ($old && $old->is_websocket) {
$self->{processing} -= 1;
delete $self->{connections}->{$id};
Expand Down Expand Up @@ -411,12 +409,12 @@ sub _read {

# Corrupted connection
return unless my $c = $self->{connections}->{$id};
return $self->_drop($id) unless my $tx = $c->{transaction};
return $self->_drop($id) unless my $tx = $c->{tx};

# Process incoming data
$tx->client_read($chunk);
if ($tx->is_finished) { $self->_handle($id) }
elsif ($c->{transaction}->is_writing) { $self->_write($id) }
if ($tx->is_finished) { $self->_handle($id) }
elsif ($c->{tx}->is_writing) { $self->_write($id) }
}

sub _redirect {
Expand Down Expand Up @@ -508,7 +506,7 @@ sub _upgrade {

# No upgrade request
my $c = $self->{connections}->{$id};
my $old = $c->{transaction};
my $old = $c->{tx};
return unless $old->req->headers->upgrade;

# Handshake failed
Expand All @@ -520,7 +518,7 @@ sub _upgrade {
$new->kept_alive($old->kept_alive);
$res->error('WebSocket challenge failed.') and return
unless $new->client_challenge;
$c->{transaction} = $new;
$c->{tx} = $new;
$self->_loop->stream($id)->timeout($self->websocket_timeout);
weaken $self;
$new->on(resume => sub { $self->_write($id) });
Expand All @@ -533,7 +531,7 @@ sub _write {

# Prepare outgoing data
return unless my $c = $self->{connections}->{$id};
return unless my $tx = $c->{transaction};
return unless my $tx = $c->{tx};
return unless $tx->is_writing;
return if $self->{writing}++;
my $chunk = $tx->client_write;
Expand Down

0 comments on commit 6ae1534

Please sign in to comment.