Skip to content

Commit

Permalink
default type copy to monthly slices (unless query is provided)
Browse files Browse the repository at this point in the history
  • Loading branch information
mickeyn committed Dec 6, 2016
1 parent b4a74ee commit 781e4ec
Showing 1 changed file with 51 additions and 9 deletions.
60 changes: 51 additions & 9 deletions lib/MetaCPAN/Script/Mapping.pm
Expand Up @@ -7,6 +7,7 @@ use Log::Contextual qw( :log );
use Moose;
use MetaCPAN::Types qw( Bool Str );
use Cpanel::JSON::XS qw( decode_json );
use DateTime;

use constant {
EXPECTED => 1,
Expand Down Expand Up @@ -75,7 +76,8 @@ has copy_query => (
is => 'ro',
isa => Str,
default => "",
documentation => 'match query',
documentation => 'match query (default: monthly time slices, '
. ' if provided must be a valid json query OR "match_all")',
);

has reindex => (
Expand Down Expand Up @@ -226,32 +228,72 @@ sub index_create {

sub copy_index {
my $self = shift;
my $type = $self->copy_type;
$type or die "can't copy without a type\n";

my $query = { match_all => {} };
if ( $self->copy_query ) {
$self->_check_index_exists( $self->copy_to_index, EXPECTED );
$self->copy_type or die "can't copy without a type\n";

my $arg_query = $self->copy_query;
my $query
= $arg_query eq 'match_all'
? '{"match_all":{}}'
: undef;

if ( $arg_query and !$query ) {
eval {
$query = decode_json $self->copy_query;
$query = decode_json $arg_query;
1;
} or do {
my $err = $@ || 'zombie error';
die $err;
};
}

return $self->_copy_slice($query) if $query;

# else ... do copy by monthly slices

my $dt = DateTime->new( year => 1994, month => 1 );
my $end_time = DateTime->now()->add( months => 1 );

while ( $dt < $end_time ) {
my $gte = $dt->strftime("%Y-%m");
$dt->add( months => 1 );
my $lt = $dt->strftime("%Y-%m");

my $q = +{ range => { date => { gte => $gte, lt => $lt } } };

log_info {"copying data for month: $gte"};
eval {
$self->_copy_slice($q);
1;
} or do {
my $err = $@ || 'zombie error';
warn $err;
};
}
}

sub _copy_slice {
my ( $self, $query ) = @_;

my $scroll = $self->es()->scroll_helper(
search_type => 'scan',
size => 250,
scroll => '10m',
index => $self->index->name,
type => $type,
body => { query => { filtered => { query => $query } } },
type => $self->copy_type,
body => {
query => {
filtered => {
query => $query
}
}
},
);

my $bulk = $self->es->bulk_helper(
index => $self->copy_to_index,
type => $type,
type => $self->copy_type,
max_count => 500,
);

Expand Down

0 comments on commit 781e4ec

Please sign in to comment.