Skip to content

Commit

Permalink
fixed a few unsubscribe and error event bugs in Mojo::EventEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
kraih committed Oct 29, 2013
1 parent 5cf86ba commit 652596e
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 24 deletions.
1 change: 1 addition & 0 deletions Changes
@@ -1,5 +1,6 @@

4.53 2013-10-30
- Fixed a few unsubscribe and error event bugs in Mojo::EventEmitter.

4.52 2013-10-29
- Improved Mojo::EventEmitter to allow unhandled error events to be fatal.
Expand Down
14 changes: 4 additions & 10 deletions lib/Mojo/EventEmitter.pm
Expand Up @@ -27,14 +27,8 @@ sub emit_safe {
warn "-- Emit $name in @{[blessed $self]} safely (@{[scalar @$s]})\n"
if DEBUG;
for my $cb (@$s) {
unless (eval { $self->$cb(@_); 1 }) {

# Error event failed
if ($name eq 'error') { die "@{[blessed $self]}: $@" }

# Normal event failed
else { $self->emit_safe(error => qq{Event "$name" failed: $@}) }
}
$self->emit(error => qq{Event "$name" failed: $@})
unless eval { $self->$cb(@_); 1 };
}
}
else {
Expand Down Expand Up @@ -74,7 +68,7 @@ sub unsubscribe {
my ($self, $name, $cb) = @_;

# One
if ($cb) {
if ($cb && @{$self->{events}{$name}} > 1) {
$self->{events}{$name} = [grep { $cb ne $_ } @{$self->{events}{$name}}];
}

Expand Down Expand Up @@ -128,7 +122,7 @@ L<Mojo::EventEmitter> can emit the following events.
...
});
Emitted safely for event errors, fatal if unhandled.
Emitted for event errors, fatal if unhandled.
$e->on(error => sub {
my ($e, $err) = @_;
Expand Down
17 changes: 8 additions & 9 deletions lib/Mojo/IOLoop/Client.pm
Expand Up @@ -55,12 +55,12 @@ sub _connect {
$options{LocalAddr} = $args->{local_address} if $args->{local_address};
$options{PeerAddr} =~ s/[\[\]]//g if $options{PeerAddr};
my $class = IPV6 ? 'IO::Socket::IP' : 'IO::Socket::INET';
return $self->emit_safe(error => "Couldn't connect: $@")
return $self->emit(error => "Couldn't connect: $@")
unless $self->{handle} = $handle = $class->new(%options);

# Timeout
$self->{timer} = $reactor->timer($args->{timeout} || 10,
sub { $self->emit_safe(error => 'Connect timeout') });
sub { $self->emit(error => 'Connect timeout') });
}
$handle->blocking(0);

Expand Down Expand Up @@ -88,18 +88,17 @@ sub _try {

# Retry or handle exceptions
my $handle = $self->{handle};
return $! == EINPROGRESS ? undef : $self->emit_safe(error => $!)
return $! == EINPROGRESS ? undef : $self->emit(error => $!)
if IPV6 && !$handle->connect;
return $self->emit_safe(error => $! = $handle->sockopt(SO_ERROR))
return $self->emit(error => $! = $handle->sockopt(SO_ERROR))
if !IPV6 && !$handle->connected;

# Disable Nagle's algorithm
setsockopt $handle, IPPROTO_TCP, TCP_NODELAY, 1;

return $self->_cleanup->emit_safe(connect => $handle)
if !$args->{tls} || $handle->isa('IO::Socket::SSL');
return $self->emit_safe(
error => 'IO::Socket::SSL 1.75 required for TLS support')
return $self->emit(error => 'IO::Socket::SSL 1.75 required for TLS support')
unless TLS;

# Upgrade
Expand All @@ -108,7 +107,7 @@ sub _try {
SSL_ca_file => $args->{tls_ca}
&& -T $args->{tls_ca} ? $args->{tls_ca} : undef,
SSL_cert_file => $args->{tls_cert},
SSL_error_trap => sub { $self->_cleanup->emit_safe(error => $_[1]) },
SSL_error_trap => sub { $self->_cleanup->emit(error => $_[1]) },
SSL_hostname => $args->{address},
SSL_key_file => $args->{tls_key},
SSL_startHandshake => 0,
Expand All @@ -118,7 +117,7 @@ sub _try {
);
my $reactor = $self->reactor;
$reactor->remove($handle);
return $self->emit_safe(error => 'TLS upgrade failed')
return $self->emit(error => 'TLS upgrade failed')
unless $handle = IO::Socket::SSL->start_SSL($handle, %options);
$reactor->io($handle => sub { $self->_tls })->watch($handle, 0, 1);
}
Expand Down Expand Up @@ -175,7 +174,7 @@ Emitted safely once the connection is established.
...
});
Emitted safely if an error occurs on the connection.
Emitted if an error occurs on the connection.
=head1 ATTRIBUTES
Expand Down
4 changes: 2 additions & 2 deletions lib/Mojo/IOLoop/Stream.pm
Expand Up @@ -107,7 +107,7 @@ sub _error {
return $self->close if $! == ECONNRESET || $! == EPIPE;

# Error
$self->emit_safe(error => $!)->close;
$self->emit(error => $!)->close;
}

sub _read {
Expand Down Expand Up @@ -204,7 +204,7 @@ Emitted safely once all data has been written.
...
});
Emitted safely if an error occurs on the stream.
Emitted if an error occurs on the stream.
=head2 read
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Reactor.pm
Expand Up @@ -77,7 +77,7 @@ the following new ones.
...
});
Emitted safely for exceptions caught in callbacks.
Emitted for exceptions caught in callbacks.
$reactor->on(error => sub {
my ($reactor, $err) = @_;
Expand Down
2 changes: 1 addition & 1 deletion lib/Mojo/Reactor/Poll.pm
Expand Up @@ -106,7 +106,7 @@ sub _poll { shift->{poll} ||= IO::Poll->new }

sub _sandbox {
my ($self, $desc, $cb) = (shift, shift, shift);
eval { $self->$cb(@_); 1 } or $self->emit_safe(error => "$desc failed: $@");
eval { $self->$cb(@_); 1 } or $self->emit(error => "$desc failed: $@");
}

sub _timer {
Expand Down
6 changes: 5 additions & 1 deletion t/mojo/eventemitter.t
Expand Up @@ -23,8 +23,12 @@ like $@, qr/^Mojo::EventEmitter: works/, 'right error';

# Exception in error event
$e->once(error => sub { die "$_[1]entional" });
eval { $e->emit(error => 'int') };
like $@, qr/^intentional/, 'right error';
$e->once(error => sub { die "$_[1]entional" });
eval { $e->emit_safe(error => 'int') };
like $@, qr/^Mojo::EventEmitter: intentional/, 'right error';
like $@, qr/^Mojo::EventEmitter: Event "error" failed: intentional/,
'right error';

# Error fallback
my ($echo, $err);
Expand Down

0 comments on commit 652596e

Please sign in to comment.