Skip to content

Commit

Permalink
Merge pull request #454 from CPAN-API/leo/switch_builk_to_new_es
Browse files Browse the repository at this point in the history
bulk_index no work, use bulk_helper instead
  • Loading branch information
oalders committed Apr 21, 2016
2 parents 98c240c + 24b9215 commit f971482
Showing 1 changed file with 30 additions and 31 deletions.
61 changes: 30 additions & 31 deletions lib/MetaCPAN/Script/Backup.pm
Expand Up @@ -102,51 +102,50 @@ sub run_restore {
my $es = $self->es;
my $fh = IO::Zlib->new( $self->restore->stringify, 'rb' );

my %bulk_store;

while ( my $line = $fh->readline ) {

state $line_count = 0;
++$line_count;
my $obj;
my $raw;

try { $obj = decode_json($line) }
try { $raw = decode_json($line) }
catch {
log_warn {"cannot decode JSON: $line --- $_"};
};

my $parent = $obj->{fields}->{_parent};
push(
@bulk,
# Create our bulk_helper if we need,
# incase a backup has mixed _index or _type
# create a new bulk helper for each
my $bulk_key = $raw->{_index} . $raw->{_type};

$bulk_store{$bulk_key} ||= $es->bulk_helper(
index => $raw->{_index},
type => $raw->{_type},
max_count => $self->batch_size
);

# Fetch relevant bulk helper
my $bulk = $bulk_store{$bulk_key};

my $parent = $raw->{fields}->{_parent};

$bulk->create(
{
id => $obj->{_id},
id => $raw->{_id},
$parent ? ( parent => $parent ) : (),
index => $obj->{_index},
type => $obj->{_type},
data => $obj->{_source},
source => $raw->{_source},
}
);

if ( @bulk >= $self->batch_size ) {
log_info { 'line count: ' . $line_count };
try {
$es->bulk_index( \@bulk );
}
catch {
# try docs individually to find the problem doc(s)
log_warn {"failed to bulk index $_"};
foreach my $document (@bulk) {
try {
$es->bulk_index( [$document] );
}
catch {
log_warn {
"failed to index document: $_" . p $document;
};
};
}
};
@bulk = ();
}
}
$es->bulk_index( \@bulk );

# Flush anything left over just incase
for my $bulk ( values %bulk_store ) {
$bulk->flush;
}

log_info {'done'};
}

Expand Down

0 comments on commit f971482

Please sign in to comment.