Skip to content

Instantly share code, notes, and snippets.

@karmi
Last active December 24, 2015 03:49
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save karmi/6739879 to your computer and use it in GitHub Desktop.
Save karmi/6739879 to your computer and use it in GitHub Desktop.
Stackexchange and Elasticsearch
require 'pathname'
require 'nokogiri'
require 'typhoeus'
require 'typhoeus/adapters/faraday'
require 'elasticsearch'
require 'ansi/progressbar'
STDOUT.sync = true
STDERR.sync = true
module Stackexchange
class Reader
BATCH_SIZE = 1_000
attr_reader :input
def initialize(input, options={})
@input = input
end
def lines
`wc -l #{input.path}`.chomp.to_i
end
def basename(*args)
File.basename(input, *args)
end
def each_batch &block
buffer = []
index = 0
input.rewind
input.each do |line|
next unless line =~ /^\s*<row/
buffer << line
index += 1
if index % BATCH_SIZE == 0
yield buffer
buffer = []
end
end
yield buffer unless buffer.empty?
end
end
class Importer
attr_reader :path, :client, :index
def initialize(path, options={})
@path = Pathname(path)
transport = (Elasticsearch::Transport::Transport::HTTP::Faraday.new \
hosts: [ { host: 'localhost', port: '9200' } ] do |f|
f.adapter :typhoeus
end)
@client = Elasticsearch::Client.new transport: transport
@index = options[:index] || @path.basename.to_s
end
def run!
start = Time.now
setup
store_users
store_comments
store_questions_and_answers
cleanup
puts ANSI.ansi(
"Total:".ljust(15) +
ANSI::Progressbar.new('Total', 100).send(:format_time, Time.now-start), :bold
)
end
def settings_and_mappings
{
settings: {
analysis: {
analyzer: {
# ***** CUSTOM ANALYZER *****************************************
html_strip: {
tokenizer: "standard",
filter: [
"standard",
"lowercase",
"stop",
"snowball"
],
char_filter: [
"html_strip"
]
}
}
}
},
mappings: {
# ***** QUESTION ****************************************************
question: {
properties: {
body: {
type: "string",
analyzer: "html_strip"
},
user: {
properties: {
display_name: {
type: "multi_field",
fields: {
display_name: {
type: "string"
},
display_name_raw: {
type: "string",
analyzer: "keyword"
}
}
}
}
},
last_editor: {
properties: {
display_name: {
type: "multi_field",
fields: {
display_name: {
type: "string"
},
display_name_raw: {
type: "string",
analyzer: "keyword"
}
}
}
}
},
tags: {
type: "string",
analyzer: "keyword"
},
comment_count: {
type: "short"
},
comments: {
type: "nested",
properties: {
user: {
properties: {
display_name: {
type: "multi_field",
fields: {
display_name: {
type: "string"
},
display_name_raw: {
type: "string",
analyzer: "keyword"
}
}
}
}
}
}
},
rating: {
type: 'short'
},
# http://docs.oracle.com/javase/tutorial/java/nutsandbolts/datatypes.html
answer_count: {
type: 'short'
},
favorite_count: {
type: 'integer'
},
view_count: {
type: 'integer'
}
}
},
# ***** ANSWER ******************************************************
answer: {
_parent: {
type: "question"
},
_routing: {
required: true,
path: "parent_id"
},
properties: {
user: {
type: "nested"
},
comment_count: {
type: "short"
},
comments: {
type: "nested",
properties: {
user: {
properties: {
display_name: {
type: "multi_field",
fields: {
display_name: {
type: "string"
},
display_name_raw: {
type: "string",
analyzer: "keyword"
}
}
}
}
}
}
},
rating: {
type: 'short'
},
favorite_count: {
type: 'integer'
}
}
},
# ***** USER ********************************************************
user: {
properties: {
display_name: {
type: "multi_field",
fields: {
display_name: {
type: "string"
},
display_name_raw: {
type: "string",
analyzer: "keyword"
}
}
}
}
}
}
}
end
def setup
client.indices.delete index: index rescue nil
client.indices.create index: index, body: settings_and_mappings
client.indices.put_settings index: index, body: {
index: { number_of_replicas: 0, refresh_interval: '-1' }
}
end
def cleanup
client.indices.put_settings index: index, body: {
index: { number_of_replicas: 1, refresh_interval: '1s' }
}
end
def xml_to_hash(xml)
doc = ::Nokogiri::XML(xml)
attrs = doc.children.first.attribute_nodes.reduce({}) do |memo,attr|
name = Utils.underscore(attr.name) # CreationDate => creation_date
memo[name] = attr.value
memo
end
end
def store_users(options={})
__store_flat 'Users.xml'
end
def store_comments(options={})
__store_flat 'Comments.xml'
end
def store_questions_and_answers(options={})
filename = 'Posts.xml'
reader = Reader.new File.new(path.join(filename))
bar = ANSI::Progressbar.new(reader.basename('.xml'), reader.lines / Reader::BATCH_SIZE )
reader.each_batch do |batch|
posts = batch.map { |i| xml_to_hash(i) }
# Look up users
posts.map! do |i|
if i['owner_user_id'] and user = client.get(index: index, type: 'user', id: i['owner_user_id'], fields: ['display_name', 'location']) and user['fields']
i.merge! user: { display_name: user['fields']['display_name'], location: user['fields']['location'] }
else
i
end
end
# Look up comments
#
posts.map! do |i|
comments = client.search(index: index, type: 'comment', size: 1_000, search_type: 'count', body: {
query: {
filtered: {
filter: {
term: { post_id: i['id'] }
}
}
}
})['hits']['hits']
i.merge! comments: comments.map { |c| c['_source'] }
end
# Store documents (as correct types)
#
pid = Process.fork do
client.bulk index: index, body: (posts.map do |i|
# Get type
#
type = i['post_type_id'] == '1' ? 'question' : 'answer'
# Skip answers without questions
#
next if type == 'answer' and not i['parent_id']
# Rename `score` to `rating`
i['rating'] = i.delete('score')
# Extract tags
i['tags'] = i['tags'].split(/</).reject { |s| s =~ /^\s*$/ }.map { |s| s.gsub(/>/, '') } if i['tags']
# Index document
{ index: { _type: type, _id: i['id'], data: i } }
end.compact)
end
Process.detach pid
bar.inc
end
bar.finish
end
def __store_flat(filename, options={})
reader = Reader.new File.new(path.join(filename))
bar = ANSI::Progressbar.new(reader.basename('.xml'), reader.lines / Reader::BATCH_SIZE )
type = options[:type] || Utils.underscore(reader.basename('.xml')).gsub(/([^s]+)s?$/, '\1')
pids = []
reader.each_batch do |batch|
items = batch.map { |i| xml_to_hash(i) }
client.bulk body: items.map { |i|
i['rating'] = i.delete('score')
{ index: { _index: index, _type: type, _id: i['id'], data: i } }
}
bar.inc
end
bar.finish
end
end
module Utils
def underscore(string)
string.gsub(/([A-Z])([^A-Z]+)/) { |a,b| "#{a.downcase}_#{b}" }.gsub(/_{1,}/, '_').gsub(/_$/, '')
end
extend self
end
end
if $0 == __FILE__
if ARGV.empty?
puts ANSI.ansi("USAGE: #{__FILE__} path/to/stackexchange/dump", :red)
exit(1)
end
Stackexchange::Importer.new(ARGV[0]).run!
end
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: 'question', q: 'ruby'
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name']
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name'],
highlight: {
fields: {
title: { fragment_size: 50},
body: { fragment_size: 50}
}
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name'],
highlight: {
fields: {
title: { fragment_size: 50},
body: { fragment_size: 50}
}
},
sort: {
creation_date: 'desc'
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
custom_score: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
script: '_score * doc["rating"].value'
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name'],
highlight: {
fields: {
title: { fragment_size: 50},
body: { fragment_size: 50}
}
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
custom_score: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
# http://www.wolframalpha.com/input/?i=log%281%29%2C+log%282%29%2C+log%285%29%2C+log%2810%29%2C+log%2850%29
script: '_score * log10(doc["rating"].value)'
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name'],
highlight: {
fields: {
title: { fragment_size: 50},
body: { fragment_size: 50}
}
}
}
#!/usr/bin/env ruby
require File.expand_path '../__common__', __FILE__
ap $client.search index: 'programmers.stackexchange.com', type: ['question','answer'], body: {
query: {
custom_score: {
query: {
filtered: {
query: {
multi_match: {
query: "ruby",
fields: ["title^10", "body"]
}
},
filter: {
range: {
creation_date: {
from: '2013-01-01'
}
}
}
}
},
script: '_score * log10(doc["rating"].value)'
}
},
fields: ['title', 'tags', 'creation_date', 'rating', 'user.location', 'user.display_name'],
highlight: {
fields: {
title: { fragment_size: 50},
body: { fragment_size: 50}
}
},
facets: {
tags: { terms: { field: 'tags' } },
frequency: { date_histogram: { field: 'creation_date', interval: 'month' } },
comment_stats: { statistical: { field: 'comment_count' } }
}
}
require 'elasticsearch'
require 'awesome_print'
require 'logger'
require 'ansi'
logger = Logger.new(STDERR)
logger.formatter = proc do |severity, datetime, progname, msg|
ANSI.ansi(msg, :faint) + "\n"
end
$client = Elasticsearch::Client.new host: 'localhost:9200', logger: logger
$client.transport.logger.level = Logger::INFO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment