Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Removed Search::Elasticsearch
Created an internall scroller for the client.

We only use this one feature (scrolled searches) and we have it
as a dependency to function as an API to an API we could just
use directly.

The motivation is the breaking changes in latest 5.x versions of
Search::Elasticsearch and the fact it's incompatiable with the
current client code.
  • Loading branch information
mickeyn committed Dec 20, 2016
1 parent 9dcb523 commit 19cc83f
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 30 deletions.
2 changes: 0 additions & 2 deletions cpanfile
Expand Up @@ -8,14 +8,12 @@ requires "Moo::Role" => "0";
requires "Net::SSLeay" => "1.49";
requires "Ref::Util" => "0";
requires "Safe::Isa" => "0";
requires "Search::Elasticsearch" => "== 2.03"; # pinned
requires "URI::Escape";
requires "perl" => "5.010";
requires "strict" => "0";
requires "warnings" => "0";

on 'test' => sub {
requires "Search::Elasticsearch::Scroll" => "0";
requires "Test::Fatal" => "0";
requires "Test::More" => "0";
requires "Test::Requires" => "0";
Expand Down
32 changes: 11 additions & 21 deletions lib/MetaCPAN/Client/Request.pm
Expand Up @@ -6,10 +6,11 @@ package MetaCPAN::Client::Request;
use Moo;
use Carp;
use JSON::MaybeXS qw<decode_json encode_json>;
use Search::Elasticsearch;
use HTTP::Tiny;
use Ref::Util qw< is_arrayref is_hashref >;

use MetaCPAN::Client::Scroll;

has _clientinfo => (
is => 'ro',
isa => sub {
Expand Down Expand Up @@ -72,7 +73,7 @@ sub _build_ua {
my $self = shift;

# This level of indirection is so that if a user has not specified a custom UA
# MetaCPAN::Client and ElasticSearch will have their own UA's
# MetaCPAN::Client will have its own UA's
#
# But if the user **has** specified a custom UA, that UA is used for both.
if ( $self->_has_user_ua ) {
Expand Down Expand Up @@ -126,23 +127,12 @@ sub ssearch {
my $args = shift;
my $params = shift;

my $es = Search::Elasticsearch->new(
nodes => [ $self->base_url ],
cxn_pool => 'Static::NoPing',
send_get_body_as => 'POST',
( $self->_has_user_ua ? ( handle => $self->_user_ua ) : () )
);

my $body = $self->_build_body($args, $params);

my $scroller = $es->scroll_helper(
( search_type => 'scan' ) x !$self->_is_agg,
scroll => '5m',
index => 'cpan',
type => $type,
size => 1000,
body => $body,
%{ $params },
my $scroller = MetaCPAN::Client::Scroll->new(
ua => $self->ua,
size => 500,
base_url => $self->base_url,
type => $type,
body => $self->_build_body($args, $params),
);

return $scroller;
Expand Down Expand Up @@ -347,5 +337,5 @@ Fetches a path from MetaCPAN (post or get), and returns the decoded result.
=head2 ssearch
Calls an Elastic Search query (using L<Search::Elasticsearch> and returns an
L<Search::Elasticsearch::Scroll> scroller object.
Calls an Elastic Search query and returns an L<MetaCPAN::Client::Scroll>
scroller object.
6 changes: 3 additions & 3 deletions lib/MetaCPAN/Client/ResultSet.pm
Expand Up @@ -22,8 +22,8 @@ has scroller => (
is => 'ro',
isa => sub {
use Safe::Isa;
$_[0]->$_isa('Search::Elasticsearch::Scroll')
or croak 'scroller must be an Search::Elasticsearch::Scroll object';
$_[0]->$_isa('MetaCPAN::Client::Scroll')
or croak 'scroller must be an MetaCPAN::Client::Scroll object';
},
predicate => 'has_scroller',
);
Expand Down Expand Up @@ -91,7 +91,7 @@ provides easy access to the scroller and aggregations.
=head2 scroller
An L<Search::Elasticsearch::Scroll> object.
An L<MetaCPAN::Client::Scroll> object.
=head2 items
Expand Down
167 changes: 167 additions & 0 deletions lib/MetaCPAN/Client/Scroll.pm
@@ -0,0 +1,167 @@
package MetaCPAN::Client::Scroll;

use Moo;
use Carp;
use Ref::Util qw< is_arrayref is_hashref is_ref >;
use JSON::MaybeXS qw< decode_json encode_json >;

has ua => (
is => 'ro',
required => 1,
);

has size => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'size must be a scalar';
},
);

has time => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'time must be a scalar';
},
);

has base_url => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'base_url must be a scalar';
},
required => 1,
);

has type => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'type must be a scalar';
},
required => 1,
);

has body => (
is => 'ro',
isa => sub {
is_hashref($_[0]) or croak 'body must be a hashref';
},
required => 1,
);

has id => (
is => 'ro',
isa => sub {
is_ref($_[0]) and croak 'id must be a scalar';
},
required => 1,
);

has total => (
is => 'ro',
isa => sub {
is_ref($_[0]) or $_[0] =~ /[^0-9]/
and croak 'total must be a number';
},
required => 1,
);

has _read => (
is => 'ro',
isa => sub {
is_ref($_[0]) or $_[0] =~ /[^0-9]/
and croak 'total must be a number';
},
default => sub { 0 },
);

has buffer => (
is => 'ro',
isa => sub {
is_arrayref($_[0]) or croak 'buffer must be an array ref';
},
default => sub { [] },
);

has aggregations => (
is => 'ro',
isa => sub {
is_hashref($_[0]) or croak 'aggregations must be an hash ref';
},
default => sub { +{} },
);

sub BUILDARGS {
my ( $class, %args ) = @_;
$args{time} //= '5m';
$args{size} //= '100';

my ( $ua, $base_url, $type, $body, $time, $size ) =
@args{qw< ua base_url type body time size >};

# fetch
my $res = $ua->post(
sprintf( '%s/%s/_search?scroll=%s&size=%s', $base_url, $type, $time, $size ),
{ content => encode_json $body }
);

croak "failed to create a scrolled search"
unless $res->{status} == 200;

my $content = decode_json $res->{content};

# set id
my $id = $content->{_scroll_id};
$args{id} = $id;

# set total
$args{total} = $content->{hits}{total};

# set buffer
push @{ $args{buffer} } => @{ $content->{hits}{hits} };

# fecth aggregations
if ( $content->{aggregations} and is_hashref( $content->{aggregations} ) ) {
$args{aggregations} = $content->{aggregations};
}

return \%args;
}

sub next {
my $self = shift;
return if $self->_read >= $self->total;

$self->_fetch_next unless @{ $self->buffer };

return shift @{ $self->buffer };
}

sub _fetch_next {
my $self = shift;

my $res = $self->ua->post(
sprintf( '%s/_search/scroll?scroll=%s&size=%s', $self->base_url, $self->time, $self->size ),
{ content => $self->id }
);

croak "failed to fetch next scolled batch"
unless $res->{status} == 200;

my $content = decode_json $res->{content};

push @{ $self->buffer } => @{ $content->{hits}{hits} };
}

sub DEMOLISH {
my $self = shift;

my $res = $self->ua->delete(
sprintf( '%s/_search/scroll?scroll=%s', $self->base_url, $self->time ),
{ content => $self->id }
);

warn "failed to delete scroller"
unless $res->{status} == 200;
}

1;
3 changes: 1 addition & 2 deletions t/api/favorite.t
Expand Up @@ -15,6 +15,5 @@ foreach my $option ( { author => 'XSAWYERX' }, { dist => 'MetaCPAN-Client' } ) {
isa_ok( $rs, 'MetaCPAN::Client::ResultSet' );
can_ok( $rs, qw<type scroller> );
is( $rs->type, 'favorite', 'Correct resultset type' );
isa_ok( $rs->scroller, 'Search::Elasticsearch::Scroll' );
isa_ok( $rs->scroller, 'MetaCPAN::Client::Scroll' );
}

4 changes: 2 additions & 2 deletions t/resultset.t
Expand Up @@ -9,7 +9,7 @@ use MetaCPAN::Client::ResultSet;

{
package MetaCPAN::Client::Test::ScrollerZ;
use base 'Search::Elasticsearch::Scroll'; # < 5.10 FTW (except, no)
use base 'MetaCPAN::Client::Scroll'; # < 5.10 FTW (except, no)
sub total {0}
}

Expand All @@ -26,7 +26,7 @@ like(

my $rs = MetaCPAN::Client::ResultSet->new(
type => 'author',
scroller => bless {}, 'Search::Elasticsearch::Scroll',
scroller => bless {}, 'MetaCPAN::Client::Scroll',
);

isa_ok( $rs, 'MetaCPAN::Client::ResultSet' );
Expand Down

0 comments on commit 19cc83f

Please sign in to comment.