Skip to content

Commit

Permalink
Removed Search::Elasticsearch
Browse files Browse the repository at this point in the history
Created an internall scroller for the client.

We only use this one feature (scrolled searches) and we have it
as a dependency to function as an API to an API we could just
use directly.

The motivation is the breaking changes in latest 5.x versions of
Search::Elasticsearch and the fact it's incompatiable with the
current client code.
  • Loading branch information
mickeyn committed Dec 20, 2016
1 parent 9dcb523 commit 19cc83f
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 30 deletions.
2 changes: 0 additions & 2 deletions cpanfile
Expand Up @@ -8,14 +8,12 @@ requires "Moo::Role" => "0";
requires "Net::SSLeay" => "1.49";
requires "Ref::Util" => "0";
requires "Safe::Isa" => "0";
requires "Search::Elasticsearch" => "== 2.03"; # pinned
requires "URI::Escape";
requires "perl" => "5.010";
requires "strict" => "0";
requires "warnings" => "0";

on 'test' => sub {
requires "Search::Elasticsearch::Scroll" => "0";
requires "Test::Fatal" => "0";
requires "Test::More" => "0";
requires "Test::Requires" => "0";
Expand Down
32 changes: 11 additions & 21 deletions lib/MetaCPAN/Client/Request.pm
Expand Up @@ -6,10 +6,11 @@ package MetaCPAN::Client::Request;
use Moo;
use Carp;
use JSON::MaybeXS qw<decode_json encode_json>;
use Search::Elasticsearch;
use HTTP::Tiny;
use Ref::Util qw< is_arrayref is_hashref >;

use MetaCPAN::Client::Scroll;

has _clientinfo => (
is => 'ro',
isa => sub {
Expand Down Expand Up @@ -72,7 +73,7 @@ sub _build_ua {
my $self = shift;

# This level of indirection is so that if a user has not specified a custom UA
# MetaCPAN::Client and ElasticSearch will have their own UA's
# MetaCPAN::Client will have its own UA's
#
# But if the user **has** specified a custom UA, that UA is used for both.
if ( $self->_has_user_ua ) {
Expand Down Expand Up @@ -126,23 +127,12 @@ sub ssearch {
my $args = shift;
my $params = shift;

my $es = Search::Elasticsearch->new(
nodes => [ $self->base_url ],
cxn_pool => 'Static::NoPing',
send_get_body_as => 'POST',
( $self->_has_user_ua ? ( handle => $self->_user_ua ) : () )
);

my $body = $self->_build_body($args, $params);

my $scroller = $es->scroll_helper(
( search_type => 'scan' ) x !$self->_is_agg,
scroll => '5m',
index => 'cpan',
type => $type,
size => 1000,
body => $body,
%{ $params },
my $scroller = MetaCPAN::Client::Scroll->new(
ua => $self->ua,
size => 500,
base_url => $self->base_url,
type => $type,
body => $self->_build_body($args, $params),
);

return $scroller;
Expand Down Expand Up @@ -347,5 +337,5 @@ Fetches a path from MetaCPAN (post or get), and returns the decoded result.
=head2 ssearch
Calls an Elastic Search query (using L<Search::Elasticsearch> and returns an
L<Search::Elasticsearch::Scroll> scroller object.
Calls an Elastic Search query and returns an L<MetaCPAN::Client::Scroll>
scroller object.
6 changes: 3 additions & 3 deletions lib/MetaCPAN/Client/ResultSet.pm
Expand Up @@ -22,8 +22,8 @@ has scroller => (
is => 'ro',
isa => sub {
use Safe::Isa;
$_[0]->$_isa('Search::Elasticsearch::Scroll')
or croak 'scroller must be an Search::Elasticsearch::Scroll object';
$_[0]->$_isa('MetaCPAN::Client::Scroll')
or croak 'scroller must be an MetaCPAN::Client::Scroll object';
},
predicate => 'has_scroller',
);
Expand Down Expand Up @@ -91,7 +91,7 @@ provides easy access to the scroller and aggregations.
=head2 scroller
An L<Search::Elasticsearch::Scroll> object.
An L<MetaCPAN::Client::Scroll> object.
=head2 items
Expand Down
167 changes: 167 additions & 0 deletions lib/MetaCPAN/Client/Scroll.pm
@@ -0,0 +1,167 @@
package MetaCPAN::Client::Scroll;

use Moo;
use Carp;
use Ref::Util qw< is_arrayref is_hashref is_ref >;
use JSON::MaybeXS qw< decode_json encode_json >;

has ua => (
is => 'ro',
required => 1,
);

has size => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'size must be a scalar';
},
);

has time => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'time must be a scalar';
},
);

has base_url => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'base_url must be a scalar';
},
required => 1,
);

has type => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'type must be a scalar';
},
required => 1,
);

has body => (
is => 'ro',
isa => sub {
is_hashref($_[0]) or croak 'body must be a hashref';
},
required => 1,
);

has id => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'id must be a scalar';
},
required => 1,
);

has total => (
is => 'ro',
isa => sub {
is_ref($_[0]) or $_[0] =~ /[^0-9]/
and croak 'total must be a number';
},
required => 1,
);

has _read => (
is => 'ro',
isa => sub {
is_ref($_[0]) or $_[0] =~ /[^0-9]/
and croak 'total must be a number';
},
default => sub { 0 },
);

has buffer => (
is => 'ro',
isa => sub {
is_arrayref($_[0]) or croak 'buffer must be an array ref';
},
default => sub { [] },
);

has aggregations => (
is => 'ro',
isa => sub {
is_hashref($_[0]) or croak 'aggregations must be an hash ref';
},
default => sub { +{} },
);

sub BUILDARGS {
my ( $class, %args ) = @_;
$args{time} //= '5m';
$args{size} //= '100';

my ( $ua, $base_url, $type, $body, $time, $size ) =
@args{qw< ua base_url type body time size >};

# fetch
my $res = $ua->post(
sprintf( '%s/%s/_search?scroll=%s&size=%s', $base_url, $type, $time, $size ),
{ content => encode_json $body }
);

croak "failed to create a scrolled search"
unless $res->{status} == 200;

my $content = decode_json $res->{content};

# set id
my $id = $content->{_scroll_id};
$args{id} = $id;

# set total
$args{total} = $content->{hits}{total};

# set buffer
push @{ $args{buffer} } => @{ $content->{hits}{hits} };

# fecth aggregations
if ( $content->{aggregations} and is_hashref( $content->{aggregations} ) ) {
$args{aggregations} = $content->{aggregations};
}

return \%args;
}

sub next {
my $self = shift;
return if $self->_read >= $self->total;

$self->_fetch_next unless @{ $self->buffer };

return shift @{ $self->buffer };
}

sub _fetch_next {
my $self = shift;

my $res = $self->ua->post(
sprintf( '%s/_search/scroll?scroll=%s&size=%s', $self->base_url, $self->time, $self->size ),
{ content => $self->id }
);

croak "failed to fetch next scolled batch"
unless $res->{status} == 200;

my $content = decode_json $res->{content};

push @{ $self->buffer } => @{ $content->{hits}{hits} };
}

sub DEMOLISH {
my $self = shift;

my $res = $self->ua->delete(
sprintf( '%s/_search/scroll?scroll=%s', $self->base_url, $self->time ),
{ content => $self->id }
);

warn "failed to delete scroller"
unless $res->{status} == 200;
}

1;
3 changes: 1 addition & 2 deletions t/api/favorite.t
Expand Up @@ -15,6 +15,5 @@ foreach my $option ( { author => 'XSAWYERX' }, { dist => 'MetaCPAN-Client' } ) {
isa_ok( $rs, 'MetaCPAN::Client::ResultSet' );
can_ok( $rs, qw<type scroller> );
is( $rs->type, 'favorite', 'Correct resultset type' );
isa_ok( $rs->scroller, 'Search::Elasticsearch::Scroll' );
isa_ok( $rs->scroller, 'MetaCPAN::Client::Scroll' );
}

4 changes: 2 additions & 2 deletions t/resultset.t
Expand Up @@ -9,7 +9,7 @@ use MetaCPAN::Client::ResultSet;

{
package MetaCPAN::Client::Test::ScrollerZ;
use base 'Search::Elasticsearch::Scroll'; # < 5.10 FTW (except, no)
use base 'MetaCPAN::Client::Scroll'; # < 5.10 FTW (except, no)
sub total {0}
}

Expand All @@ -26,7 +26,7 @@ like(

my $rs = MetaCPAN::Client::ResultSet->new(
type => 'author',
scroller => bless {}, 'Search::Elasticsearch::Scroll',
scroller => bless {}, 'MetaCPAN::Client::Scroll',
);

isa_ok( $rs, 'MetaCPAN::Client::ResultSet' );
Expand Down

0 comments on commit 19cc83f

Please sign in to comment.