New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use rabbitmq for queueing #169
Conversation
…a lot of error handling still needs to be added.
…nt, improve error handling
…nd and exiting, so that the new container can give the right defs
bigquery-writer/bigquery-writer.py
Outdated
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT)) | ||
break | ||
except Exception as e: | ||
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like you forgot to add the error message here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.
All tests pass, looks good to me. 👍 |
bigquery-writer/bigquery-writer.py
Outdated
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT)) | ||
break | ||
except Exception as e: | ||
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds" % str(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
retrying in 2 seconds
might be a more useful message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places too.
bigquery-writer/bigquery-writer.py
Outdated
return obj.callback(ch, method, properties, body) | ||
|
||
|
||
def callback(self, ch, method, properties, body): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both ch
and properties
arguments are unused.
bigquery-writer/bigquery-writer.py
Outdated
|
||
|
||
@staticmethod | ||
def static_callback(ch, method, properties, body, obj): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be better to make obj
a first argument since it's the most important in the context of this method. The rest can be passed as keyword arguments without the need to explicitly specify them. These can be inferred from the callback
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is callback called by pika -- I can't change the signature of the callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Assuming it's the one in the start
below, it can probably be modified like this:
self.channel.basic_consume(
lambda ch, method, properties, body:
self.static_callback(self, ch, method, properties, body),
queue='unique'
)
(haven't tested it)
class InfluxWriterSubscriber(RedisPubSubSubscriber): | ||
def __init__(self, ls, influx, redis): | ||
RedisPubSubSubscriber.__init__(self, redis, KEYSPACE_NAME_INCOMING, __name__) | ||
class InfluxWriterSubscriber(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see that InfluxWriterSubscriber
and BigQueryWriter
have some common methods related to writing. Having an abstract ListenWriter
class can be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
...or not even abstract since connect_to_rabbitmq
is exactly the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about that,but I didn't really want to create another py module just for one function. I'll do this should I need to add another common method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fair enough. My point about an abstract writer class still stands. Other methods that can go in it are callback
, static_callback
, and start
. Here's some more information about them and why they can be useful:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That last comment was great!
(still not enough to sway me, but still. :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've spent over an hour trying to remove some args, but it always fails now. I'd rather spent my time doing other things than fixing code that is working.
influx-writer/influx-writer.py
Outdated
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT)) | ||
break | ||
except Exception as e: | ||
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
%s
is there, but the string is not formatted.
|
||
# if we're not supposed to run, just sleep | ||
if not config.WRITE_TO_BIGQUERY: | ||
sleep(1000) | ||
sleep(66666) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why has the duration increased?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The actual value doesn't matter as long as it is large to prevent the container from restarting too often.
@@ -97,38 +160,37 @@ def start(self): | |||
sleep(1000) | |||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can all the checks above be consolidated in one function? They all call sleep
and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And why is sleep
call necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there is no sleep, the container will just keep restarting, which just sucks resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really see the need to consolidate them in one function -- I don't believe in creating new functions just to keep functions below an artificial length. I believe that makes the code harder to read in the end.
docker/docker-compose.test.yml
Outdated
@@ -18,6 +18,13 @@ services: | |||
INFLUXDB_HTTP_LOG_ENABLED: 'true' | |||
INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false' | |||
|
|||
rabbitmq: | |||
image: rabbitmq:3.6.9 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We use version 3.6.5
in production. https://github.com/metabrainz/docker-server-configs/blob/master/scripts/services.sh#L481
return ret | ||
|
||
|
||
def write(self, listen_dicts): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is way too long and complex. I'd at least split it up in multiple methods that are easier to understand and test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to keep it as it, in order to ensure readability.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's my point though, it's kind of difficult to read.
Example of one of the changes that can be made:
@staticmethod
def listens_time_range(listens):
"""Calculate the time range that a set of listens covers.
Args:
listens (list): List of dictionaries with listen data.
Returns:
Tuple with two values: minimum time and maximum time.
"""
min_time = 0
max_time = 0
for listen in listens:
t = int(listen['listened_at'])
if not max_time:
min_time = max_time = t
continue
if t > max_time:
max_time = t
if t < min_time:
min_time = t
return min_time, max_time
def write(self, listen_dicts):
submit = []
unique = []
min_time, max_time = listens_time_range(listen_dicts)
...
I don't need to know how listens_time_range
is implemented to understand how write
works. listens_time_range
is also very easy to test now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You left out the part where the user_name is determined for this batch of listens, so now the clean single purpose function needs to be have two purposes or the data needs to be processed a second time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know if all the listens have user_name
in them. If yes, then it can be retrieved from the first one. If not, get from the first one that's encountered.
My snippets are examples that might not work if you just copy them. I haven't tested everything.
DUMP_JSON_WITH_ERRORS = False | ||
ERROR_RETRY_DELAY = 3 # number of seconds to wait until retrying an operation |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this value is just used in only one place. Can it be integrated into other parts with retrying within this module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Some of the issues I pointed out can be detected with Pylint. |
This PR swaps out the Redis-pubsub for a proper, less memory hungry, queue. I chose RabbitMQ since we already use that in production. This PR are improves the error handling and resilience of the writer containers and pausing on startup if a service isn't available yet.
New redis keys were needed to keep track of queue stats, so I created a new redis_keys.py module for the purpose of having a central place to track redis keys.
The per-user listen counts have a bug in them in this PR, but I am already working on an improved PR for counting user and total listens using influxdb's continuous queries that make things like counting listens much easier.