Skip to content

Commit

Permalink
implemented index alias to make import atomic
Browse files Browse the repository at this point in the history
  • Loading branch information
garbas committed May 22, 2020
1 parent b264d7a commit e952706
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 71 deletions.
146 changes: 77 additions & 69 deletions scripts/import-channel
Expand Up @@ -29,8 +29,7 @@ click_log.basic_config(logger)


CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))


INDEX_SCHEMA_VERSION=1
ANALYSIS = {
"analyzer": {
"nixAttrName": {
Expand Down Expand Up @@ -70,6 +69,47 @@ ANALYSIS = {
},
},
}
PACKAGES_MAPPING = dict(
properties=dict(
attr_name=dict(
type="text",
analyzer="nixAttrName",
fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(
fullName=dict(type="text"), url=dict(type="text"),
),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
)
OPTIONS_MAPPING = dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
)


def get_last_evaluation(channel):
Expand Down Expand Up @@ -235,75 +275,34 @@ def get_options(evaluation):
return len(options), gen


def recreate_index(es, channel):
packages_index = f"{channel}-packages"
if es.indices.exists(packages_index):
es.indices.delete(index=packages_index)
logger.debug(
f"recreate_index: index '{packages_index}' already exists and was deleted"
)
def create_index(es, index, mapping):
if es.indices.exists(index):
logger.debug(f"create_index: index '{index}' already exists")
return
es.indices.create(
index=packages_index,
body=dict(
settings=dict(number_of_shards=1, analysis=ANALYSIS),
mappings=dict(
properties=dict(
attr_name=dict(
type="text",
analyzer="nixAttrName",
fields={"raw": {"type": "keyword"}},
),
attr_set=dict(type="keyword"),
pname=dict(type="keyword"),
pversion=dict(type="keyword"),
description=dict(type="text"),
longDescription=dict(type="text"),
license=dict(
type="nested",
properties=dict(
fullName=dict(type="text"), url=dict(type="text"),
),
),
maintainers=dict(
type="nested",
properties=dict(
name=dict(type="text"),
email=dict(type="text"),
github=dict(type="text"),
),
),
platforms=dict(type="keyword"),
position=dict(type="text"),
homepage=dict(type="keyword"),
),
),
),
index=index,
body={
"settings": {
"number_of_shards": 1,
"analysis": ANALYSIS,
},
"mappings": mapping,
},
)
logger.debug(f"recreate_index: index '{packages_index}' was created")

options_index = f"{channel}-options"
if es.indices.exists(options_index):
es.indices.delete(index=options_index)
logger.debug(
f"recreate_index: index '{options_index}' already exists and was deleted"
)
es.indices.create(
index=options_index,
body=dict(
settings=dict(number_of_shards=1, analysis=ANALYSIS),
mappings=dict(
properties=dict(
option_name=dict(type="keyword"),
description=dict(type="text"),
type=dict(type="keyword"),
default=dict(type="text"),
example=dict(type="text"),
source=dict(type="keyword"),
),
),
),
logger.debug(f"create_index: index '{index}' was created")


def create_index_name(type_, channel, evaluation):
return (
f"latest-{channel}-{type_}",
f"evaluation-{INDEX_SCHEMA_VERSION}-{channel}-{evaluation['revisions_since_start']}-{evaluation['git_revision']}-{type_}",
)
logger.debug(f"recreate_index: index '{options_index}' was created")


def update_alias(es, name, index):
tmp = es.indices.put_alias(index=index, name=name)
debugger
logger.debug(f"'{name}' alias now points to '{index}' index")


@click.command()
Expand All @@ -324,7 +323,12 @@ def main(es_url, channel, verbose):

evaluation = get_last_evaluation(channel)
es = elasticsearch.Elasticsearch([es_url])
recreate_index(es, channel)

# ensure indexes exist
packages_alias, packages_index = create_index_name("packages", channel, evaluation)
options_alias, options_index = create_index_name("options", channel, evaluation)
create_index(es, packages_index, PACKAGES_MAPPING)
create_index(es, options_index, OPTIONS_MAPPING)

# write packages
number_of_packages, gen_packages = get_packages(evaluation)
Expand Down Expand Up @@ -352,6 +356,10 @@ def main(es_url, channel, verbose):
successes += ok
print("Indexed %d/%d options" % (successes, number_of_options))

# update alias
update_alias(es, packages_alias, packages_index)
update_alias(es, options_alias, options_index)


if __name__ == "__main__":
main()
Expand Down
2 changes: 1 addition & 1 deletion src/Page/Options.elm
Expand Up @@ -219,7 +219,7 @@ makeRequest :
makeRequest options channel query from size =
ElasticSearch.makeRequest
"option_name"
("nixos-" ++ channel ++ "-options")
("latest-nixos-" ++ channel ++ "-options")
decodeResultItemSource
options
query
Expand Down
2 changes: 1 addition & 1 deletion src/Page/Packages.elm
Expand Up @@ -287,7 +287,7 @@ makeRequest :
makeRequest options channel query from size =
ElasticSearch.makeRequest
"attr_name"
("nixos-" ++ channel ++ "-packages")
("latest-nixos-" ++ channel ++ "-packages")
decodeResultItemSource
options
query
Expand Down

0 comments on commit e952706

Please sign in to comment.