Skip to content

Commit

Permalink
import channel script should be idempotent
Browse files Browse the repository at this point in the history
  • Loading branch information
garbas committed May 22, 2020
1 parent 2600984 commit dc9ec56
Showing 1 changed file with 40 additions and 33 deletions.
73 changes: 40 additions & 33 deletions scripts/import-channel
Expand Up @@ -271,21 +271,24 @@ def get_options(evaluation):
return len(options), gen


def create_index(es, index, mapping):
def ensure_index(es, index, mapping):
if es.indices.exists(index):
logger.debug(f"create_index: index '{index}' already exists")
return
logger.debug(f"ensure_index: index '{index}' already exists")
return False

es.indices.create(
index=index,
body={
"settings": {"number_of_shards": 1, "analysis": ANALYSIS},
"mappings": mapping,
},
)
logger.debug(f"create_index: index '{index}' was created")
logger.debug(f"ensure_index: index '{index}' was created")

return True


def create_index_name(type_, channel, evaluation):
def ensure_index_name(type_, channel, evaluation):
return (
f"latest-{channel}-{type_}",
f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
Expand Down Expand Up @@ -317,40 +320,44 @@ def main(es_url, channel, verbose):
es = elasticsearch.Elasticsearch([es_url])

# ensure indexes exist
packages_alias, packages_index = create_index_name("packages", channel, evaluation)
options_alias, options_index = create_index_name("options", channel, evaluation)
create_index(es, packages_index, PACKAGES_MAPPING)
create_index(es, options_index, OPTIONS_MAPPING)
options_alias, options_index = ensure_index_name("options", channel, evaluation)
packages_alias, packages_index = ensure_index_name("packages", channel, evaluation)
packages_index_created = ensure_index(es, packages_index, PACKAGES_MAPPING)
options_index_created = ensure_index(es, options_index, OPTIONS_MAPPING)

# write packages
number_of_packages, gen_packages = get_packages(evaluation)
if number_of_packages:
click.echo("Indexing packages...")
progress = tqdm.tqdm(unit="packages", total=number_of_packages)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=packages_index, actions=gen_packages()
):
progress.update(1)
successes += ok
click.echo("Indexed %d/%d packages" % (successes, number_of_packages))
if packages_index_created:
number_of_packages, gen_packages = get_packages(evaluation)
if number_of_packages:
click.echo("Indexing packages...")
progress = tqdm.tqdm(unit="packages", total=number_of_packages)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=packages_index, actions=gen_packages()
):
progress.update(1)
successes += ok
click.echo("Indexed %d/%d packages" % (successes, number_of_packages))

# write options
number_of_options, gen_options = get_options(evaluation)
if number_of_options:
click.echo("Indexing options...")
progress = tqdm.tqdm(unit="options", total=number_of_options)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=options_index, actions=gen_options()
):
progress.update(1)
successes += ok
print("Indexed %d/%d options" % (successes, number_of_options))
if options_index_created:
number_of_options, gen_options = get_options(evaluation)
if number_of_options:
click.echo("Indexing options...")
progress = tqdm.tqdm(unit="options", total=number_of_options)
successes = 0
for ok, action in elasticsearch.helpers.streaming_bulk(
client=es, index=options_index, actions=gen_options()
):
progress.update(1)
successes += ok
print("Indexed %d/%d options" % (successes, number_of_options))

# update alias
update_alias(es, packages_alias, packages_index)
update_alias(es, options_alias, options_index)
if packages_index_created:
update_alias(es, packages_alias, packages_index)
if options_index_created:
update_alias(es, options_alias, options_index)


if __name__ == "__main__":
Expand Down

0 comments on commit dc9ec56

Please sign in to comment.