Skip to content

Commit

Permalink
bulk_index no work, use bulk_helper instead
Browse files Browse the repository at this point in the history
  • Loading branch information
ranguard committed Apr 21, 2016
1 parent 21cf54a commit f395205
Showing 1 changed file with 23 additions and 27 deletions.
50 changes: 23 additions & 27 deletions lib/MetaCPAN/Script/Backup.pm
Expand Up @@ -102,7 +102,10 @@ sub run_restore {
my $es = $self->es;
my $fh = IO::Zlib->new( $self->restore->stringify, 'rb' );

my %bulk_store;

while ( my $line = $fh->readline ) {

state $line_count = 0;
++$line_count;
my $obj;
Expand All @@ -112,41 +115,34 @@ sub run_restore {
log_warn {"cannot decode JSON: $line --- $_"};
};

# Instanciate our bulk_helper if we need
my $bulk_key = $obj->{_index} . $obj->{_type};
$bulk_store{$bulk_key} ||= $es->bulk_helper(
index => $obj->{_index},
type => $obj->{_type},
max_count => $self->batch_size
);

# Fetch relevant bulk helper
my $bulk = $bulk_store{$bulk_key};

my $parent = $obj->{fields}->{_parent};
push(
@bulk,

$bulk->create(
{
id => $obj->{_id},
$parent ? ( parent => $parent ) : (),
index => $obj->{_index},
type => $obj->{_type},
data => $obj->{_source},
source => $obj->{_source},
}
);

if ( @bulk >= $self->batch_size ) {
log_info { 'line count: ' . $line_count };
try {
$es->bulk_index( \@bulk );
}
catch {
# try docs individually to find the problem doc(s)
log_warn {"failed to bulk index $_"};
foreach my $document (@bulk) {
try {
$es->bulk_index( [$document] );
}
catch {
log_warn {
"failed to index document: $_" . p $document;
};
};
}
};
@bulk = ();
}
}
$es->bulk_index( \@bulk );

# Flush anything left over just incase
for my $bulk ( values %bulk_store ) {
$bulk->flush;
}

log_info {'done'};
}

Expand Down

0 comments on commit f395205

Please sign in to comment.