Skip to content

Commit

Permalink
added support for performing blocking and non-blocking requests at th…
Browse files Browse the repository at this point in the history
…e same time with Mojo::UserAgent
  • Loading branch information
kraih committed May 11, 2014
1 parent 401f98e commit 4a03caa
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 87 deletions.
5 changes: 4 additions & 1 deletion Changes
@@ -1,5 +1,8 @@

4.99 2014-05-11
4.99 2014-05-12
- Added support for performing blocking and non-blocking requests at the
same time with Mojo::UserAgent.
- Added nb_url method to Mojo::UserAgent::Server.
- Improved Mojo::IOLoop::Server and Mojo::Server::Daemon to be able to
listen on random ports.

Expand Down
4 changes: 1 addition & 3 deletions lib/Mojo.pm
Expand Up @@ -127,9 +127,7 @@ The logging layer of your application, defaults to a L<Mojo::Log> object.
$app = $app->ua(Mojo::UserAgent->new);
A full featured HTTP user agent for use in your applications, defaults to a
L<Mojo::UserAgent> object. Note that this user agent should not be used in
plugins, since non-blocking requests that are already in progress will
interfere with new blocking ones.
L<Mojo::UserAgent> object.
# Perform blocking request
say $app->ua->get('example.com')->res->body;
Expand Down
101 changes: 49 additions & 52 deletions lib/Mojo/UserAgent.pm
Expand Up @@ -56,24 +56,12 @@ sub start {
# Non-blocking
if ($cb) {
warn "-- Non-blocking request (@{[$tx->req->url->to_abs]})\n" if DEBUG;
unless ($self->{nb}) {
croak 'Blocking request in progress' if keys %{$self->{connections}};
warn "-- Switching to non-blocking mode\n" if DEBUG;
$self->server->ioloop(Mojo::IOLoop->singleton);
$self->_cleanup->{nb}++;
}
return $self->_start($tx, $cb);
return $self->_start(1, $tx, $cb);
}

# Blocking
warn "-- Blocking request (@{[$tx->req->url->to_abs]})\n" if DEBUG;
if ($self->{nb}) {
croak 'Non-blocking requests in progress' if keys %{$self->{connections}};
warn "-- Switching to blocking mode\n" if DEBUG;
$self->server->ioloop($self->ioloop);
delete $self->_cleanup->{nb};
}
$self->_start($tx => sub { shift->ioloop->stop; $tx = shift });
$self->_start(0, $tx => sub { shift->ioloop->stop; $tx = shift });
$self->ioloop->start;

return $tx;
Expand All @@ -87,23 +75,25 @@ sub websocket {

sub _cleanup {
my $self = shift;
return unless my $loop = $self->_loop;
return unless my $loop = $self->_loop(0);

# Clean up active connections (by closing them)
$self->_handle($_, 1) for keys %{$self->{connections} || {}};

# Clean up keep-alive connections
$loop->remove($_->[1]) for @{delete $self->{queue} || []};
$loop->remove($_->[1]) for @{delete $self->{queue}{0} || []};
$loop = Mojo::IOLoop->singleton;
$loop->remove($_->[1]) for @{delete $self->{queue}{1} || []};

return $self;
}

sub _connect {
my ($self, $proto, $host, $port, $handle, $cb) = @_;
my ($self, $nb, $proto, $host, $port, $handle, $cb) = @_;

weaken $self;
my $id;
return $id = $self->_loop->client(
return $id = $self->_loop($nb)->client(
address => $host,
handle => $handle,
local_address => $self->local_address,
Expand Down Expand Up @@ -132,12 +122,12 @@ sub _connect {
}

sub _connect_proxy {
my ($self, $old, $cb) = @_;
my ($self, $nb, $old, $cb) = @_;

# Start CONNECT request
return undef unless my $new = $self->transactor->proxy_connect($old);
return $self->_start(
$new => sub {
($nb, $new) => sub {
my ($self, $tx) = @_;

# CONNECT failed (connection needs to be kept alive)
Expand All @@ -149,17 +139,17 @@ sub _connect_proxy {
# Prevent proxy reassignment and start real transaction
$old->req->proxy(0);
my $id = $tx->connection;
return $self->_start($old->connection($id), $cb)
return $self->_start($nb, $old->connection($id), $cb)
unless $tx->req->url->protocol eq 'https';

# TLS upgrade
my $loop = $self->_loop;
my $loop = $self->_loop($nb);
my $handle = $loop->stream($id)->steal_handle;
my $c = delete $self->{connections}{$id};
$loop->remove($id);
weaken $self;
$id = $self->_connect($self->transactor->endpoint($old),
$handle, sub { $self->_start($old->connection($id), $cb) });
$id = $self->_connect($nb, $self->transactor->endpoint($old),
$handle, sub { $self->_start($nb, $old->connection($id), $cb) });
$self->{connections}{$id} = $c;
}
);
Expand All @@ -169,10 +159,12 @@ sub _connected {
my ($self, $id) = @_;

# Inactivity timeout
my $stream = $self->_loop->stream($id)->timeout($self->inactivity_timeout);
my $c = $self->{connections}{$id};
my $stream
= $self->_loop($c->{nb})->stream($id)->timeout($self->inactivity_timeout);

# Store connection information in transaction
my $tx = $self->{connections}{$id}{tx}->connection($id);
my $tx = $c->{tx}->connection($id);
my $handle = $stream->handle;
$tx->local_address($handle->sockhost)->local_port($handle->sockport);
$tx->remote_address($handle->peerhost)->remote_port($handle->peerport);
Expand All @@ -184,41 +176,41 @@ sub _connected {
}

sub _connection {
my ($self, $tx, $cb) = @_;
my ($self, $nb, $tx, $cb) = @_;

# Reuse connection
my $id = $tx->connection;
my ($proto, $host, $port) = $self->transactor->endpoint($tx);
$id ||= $self->_dequeue("$proto:$host:$port", 1);
$id ||= $self->_dequeue($nb, "$proto:$host:$port", 1);
if ($id && !ref $id) {
warn "-- Reusing connection ($proto:$host:$port)\n" if DEBUG;
$self->{connections}{$id} = {cb => $cb, tx => $tx};
$self->{connections}{$id} = {cb => $cb, nb => $nb, tx => $tx};
$tx->kept_alive(1) unless $tx->connection;
$self->_connected($id);
return $id;
}

# CONNECT request to proxy required
if (my $id = $self->_connect_proxy($tx, $cb)) { return $id }
if (my $id = $self->_connect_proxy($nb, $tx, $cb)) { return $id }

# Connect
warn "-- Connect ($proto:$host:$port)\n" if DEBUG;
($proto, $host, $port) = $self->transactor->peer($tx);
weaken $self;
$id = $self->_connect(
($proto, $host, $port, $id) => sub { $self->_connected($id) });
$self->{connections}{$id} = {cb => $cb, tx => $tx};
($nb, $proto, $host, $port, $id) => sub { $self->_connected($id) });
$self->{connections}{$id} = {cb => $cb, nb => $nb, tx => $tx};

return $id;
}

sub _dequeue {
my ($self, $name, $test) = @_;
my ($self, $nb, $name, $test) = @_;

my $found;
my $loop = $self->_loop;
my $old = $self->{queue} || [];
my $new = $self->{queue} = [];
my $loop = $self->_loop($nb);
my $old = $self->{queue}{$nb} || [];
my $new = $self->{queue}{$nb} = [];
for my $queued (@$old) {
push @$new, $queued and next if $found || !grep { $_ eq $name } @$queued;

Expand All @@ -231,10 +223,10 @@ sub _dequeue {
}

sub _enqueue {
my ($self, $name, $id) = @_;
my ($self, $nb, $name, $id) = @_;

# Enforce connection limit
my $queue = $self->{queue} ||= [];
my $queue = $self->{queue}{$nb} ||= [];
my $max = $self->max_connections;
$self->_remove(shift(@$queue)->[1]) while @$queue > $max;
push @$queue, [$name, $id] if $max;
Expand All @@ -251,8 +243,8 @@ sub _handle {
my ($self, $id, $close) = @_;

# Remove request timeout
return unless my $loop = $self->_loop;
my $c = $self->{connections}{$id};
return unless my $loop = $self->_loop($c->{nb});
$loop->remove($c->{timeout}) if $c->{timeout};

# Finish WebSocket
Expand Down Expand Up @@ -283,7 +275,7 @@ sub _handle {
}
}

sub _loop { $_[0]{nb} ? Mojo::IOLoop->singleton : $_[0]->ioloop }
sub _loop { $_[1] ? Mojo::IOLoop->singleton : $_[0]->ioloop }

sub _read {
my ($self, $id, $chunk) = @_;
Expand All @@ -303,43 +295,45 @@ sub _remove {
my ($self, $id, $close) = @_;

# Close connection
my $tx = (delete($self->{connections}{$id}) || {})->{tx};
my $c = delete $self->{connections}{$id} || {};
my $tx = $c->{tx};
if ($close || !$tx || !$tx->keep_alive || $tx->error) {
$self->_dequeue($id);
return $self->_loop->remove($id);
$self->_dequeue($_, $id) for (1, 0);
$self->_loop($_)->remove($id) for (1, 0);
return;
}

# Keep connection alive (CONNECT requests get upgraded)
$self->_enqueue(join(':', $self->transactor->endpoint($tx)), $id)
$self->_enqueue($c->{nb}, join(':', $self->transactor->endpoint($tx)), $id)
unless uc $tx->req->method eq 'CONNECT';
}

sub _redirect {
my ($self, $c, $old) = @_;
return undef unless my $new = $self->transactor->redirect($old);
return undef unless @{$old->redirects} < $self->max_redirects;
return $self->_start($new, delete $c->{cb});
return $self->_start($c->{nb}, $new, delete $c->{cb});
}

sub _start {
my ($self, $tx, $cb) = @_;
my ($self, $nb, $tx, $cb) = @_;

# Application server
my $url = $tx->req->url;
unless ($url->is_abs) {
my $base = $self->server->url;
my $base = $nb ? $self->server->nb_url : $self->server->url;
$url->scheme($base->scheme)->authority($base->authority);
}

$self->proxy->inject($tx);
if (my $jar = $self->cookie_jar) { $jar->inject($tx) }

# Connect and add request timeout if necessary
my $id = $self->emit(start => $tx)->_connection($tx, $cb);
my $id = $self->emit(start => $tx)->_connection($nb, $tx, $cb);
if (my $timeout = $self->request_timeout) {
weaken $self;
$self->{connections}{$id}{timeout} = $self->_loop->timer(
$timeout => sub { $self->_error($id, 'Request timeout') });
$self->{connections}{$id}{timeout} = $self->_loop($nb)
->timer($timeout => sub { $self->_error($id, 'Request timeout') });
}

return $id;
Expand Down Expand Up @@ -367,7 +361,7 @@ sub _write {
my $chunk = $tx->client_write;
delete $c->{writing};
warn "-- Client >>> Server (@{[$tx->req->url->to_abs]})\n$chunk\n" if DEBUG;
my $stream = $self->_loop->stream($id)->write($chunk);
my $stream = $self->_loop($c->{nb})->stream($id)->write($chunk);
$self->_handle($id) if $tx->is_finished;

# Continue writing
Expand Down Expand Up @@ -646,9 +640,12 @@ L<Mojo::UserAgent::Server> object.
# Change log level
$ua->server->app->log->level('fatal');
# Port currently used for processing relative URLs
# Port currently used for processing relative URLs blocking
say $ua->server->url->port;
# Port currently used for processing relative URLs non-blocking
say $ua->server->nb_url->port;
=head2 transactor
my $t = $ua->transactor;
Expand Down
44 changes: 32 additions & 12 deletions lib/Mojo/UserAgent/Server.pm
Expand Up @@ -20,29 +20,40 @@ sub app {
return $self;
}

sub nb_url {
my $self = shift;
$self->_restart(0, @_) if !$self->{server} || @_;
return Mojo::URL->new("$self->{proto}://localhost:$self->{nb_port}/");
}

sub restart { shift->_restart(1) }

sub url {
my $self = shift;
$self->_restart(0, @_)
if !$self->{server} || $self->{server}->ioloop ne $self->ioloop || @_;
$self->_restart(0, @_) if !$self->{server} || @_;
return Mojo::URL->new("$self->{proto}://localhost:$self->{port}/");
}

sub _restart {
my ($self, $full, $proto) = @_;
delete @{$self}{qw(nb_port port)} if $full;

my $server = $self->{server} = Mojo::Server::Daemon->new(
app => $self->app,
ioloop => $self->ioloop,
silent => 1
);
weaken $server->{app};
delete $self->{port} if $full;
my $port = $self->{port} ? ":$self->{port}" : '';
$self->{proto} = $proto ||= 'http';

# Blocking
my $server = $self->{server}
= Mojo::Server::Daemon->new(ioloop => $self->ioloop, silent => 1);
weaken $server->app($self->app)->{app};
my $port = $self->{port} ? ":$self->{port}" : '';
$self->{port} = $server->listen(["$proto://127.0.0.1$port"])
->start->ioloop->acceptor($server->acceptors->[0])->handle->sockport;

# Non-blocking
$server = $self->{nb_server} = Mojo::Server::Daemon->new(silent => 1);
weaken $server->app($self->app)->{app};
$port = $self->{nb_port} ? ":$self->{nb_port}" : '';
$self->{nb_port} = $server->listen(["$proto://127.0.0.1$port"])
->start->ioloop->acceptor($server->acceptors->[0])->handle->sockport;
}

1;
Expand Down Expand Up @@ -95,6 +106,15 @@ global default.
# Change application behavior
$server->app->defaults(testing => 'oh yea!');
=head2 nb_url
my $url = $ua->nb_url;
my $url = $ua->nb_url('http');
my $url = $ua->nb_url('https');
Get absolute L<Mojo::URL> object for performing non-blocking requests with
L</"app"> and switch protocol if necessary.
=head2 restart
$server->restart;
Expand All @@ -107,8 +127,8 @@ Restart server with new port.
my $url = $ua->url('http');
my $url = $ua->url('https');
Get absolute L<Mojo::URL> object for L</"app"> and switch protocol if
necessary.
Get absolute L<Mojo::URL> object for performing blocking requests with
L</"app"> and switch protocol if necessary.
=head1 SEE ALSO
Expand Down

0 comments on commit 4a03caa

Please sign in to comment.