Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use rabbitmq for queueing #169

Merged
merged 24 commits into from Apr 26, 2017
Merged

Use rabbitmq for queueing #169

merged 24 commits into from Apr 26, 2017

Conversation

mayhem
Copy link
Member

@mayhem mayhem commented Apr 24, 2017

This PR swaps out the Redis-pubsub for a proper, less memory hungry, queue. I chose RabbitMQ since we already use that in production. This PR are improves the error handling and resilience of the writer containers and pausing on startup if a service isn't available yet.

New redis keys were needed to keep track of queue stats, so I created a new redis_keys.py module for the purpose of having a central place to track redis keys.

The per-user listen counts have a bug in them in this PR, but I am already working on an improved PR for counting user and total listens using influxdb's continuous queries that make things like counting listens much easier.

@mayhem mayhem requested a review from alastair April 24, 2017 13:01
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT))
break
except Exception as e:
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you forgot to add the error message here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

@paramsingh
Copy link
Collaborator

All tests pass, looks good to me. 👍

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT))
break
except Exception as e:
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds" % str(e))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

retrying in 2 secondsmight be a more useful message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other places too.

return obj.callback(ch, method, properties, body)


def callback(self, ch, method, properties, body):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both ch and properties arguments are unused.



@staticmethod
def static_callback(ch, method, properties, body, obj):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be better to make obj a first argument since it's the most important in the context of this method. The rest can be passed as keyword arguments without the need to explicitly specify them. These can be inferred from the callback method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is callback called by pika -- I can't change the signature of the callback.

Copy link
Contributor

@gentlecat gentlecat Apr 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assuming it's the one in the start below, it can probably be modified like this:

self.channel.basic_consume(
    lambda ch, method, properties, body:
        self.static_callback(self, ch, method, properties, body),
    queue='unique'
)

(haven't tested it)

class InfluxWriterSubscriber(RedisPubSubSubscriber):
def __init__(self, ls, influx, redis):
RedisPubSubSubscriber.__init__(self, redis, KEYSPACE_NAME_INCOMING, __name__)
class InfluxWriterSubscriber(object):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that InfluxWriterSubscriber and BigQueryWriter have some common methods related to writing. Having an abstract ListenWriter class can be useful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

...or not even abstract since connect_to_rabbitmq is exactly the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about that,but I didn't really want to create another py module just for one function. I'll do this should I need to add another common method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough. My point about an abstract writer class still stands. Other methods that can go in it are callback, static_callback, and start. Here's some more information about them and why they can be useful:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That last comment was great!

(still not enough to sway me, but still. :) )

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've spent over an hour trying to remove some args, but it always fails now. I'd rather spent my time doing other things than fixing code that is working.

self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=config.RABBITMQ_HOST, port=config.RABBITMQ_PORT))
break
except Exception as e:
self.log.error("Cannot connect to rabbitmq: %s, sleeping 2 seconds")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%s is there, but the string is not formatted.


# if we're not supposed to run, just sleep
if not config.WRITE_TO_BIGQUERY:
sleep(1000)
sleep(66666)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why has the duration increased?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual value doesn't matter as long as it is large to prevent the container from restarting too often.

@@ -97,38 +160,37 @@ def start(self):
sleep(1000)
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can all the checks above be consolidated in one function? They all call sleep and return.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why is sleep call necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is no sleep, the container will just keep restarting, which just sucks resources.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really see the need to consolidate them in one function -- I don't believe in creating new functions just to keep functions below an artificial length. I believe that makes the code harder to read in the end.

@@ -18,6 +18,13 @@ services:
INFLUXDB_HTTP_LOG_ENABLED: 'true'
INFLUXDB_CONTINUOUS_QUERIES_LOG_ENABLED: 'false'

rabbitmq:
image: rabbitmq:3.6.9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return ret


def write(self, listen_dicts):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is way too long and complex. I'd at least split it up in multiple methods that are easier to understand and test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to keep it as it, in order to ensure readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's my point though, it's kind of difficult to read.

Example of one of the changes that can be made:

@staticmethod
def listens_time_range(listens):
    """Calculate the time range that a set of listens covers.
    
    Args:
        listens (list): List of dictionaries with listen data.
    
    Returns:
        Tuple with two values: minimum time and maximum time.
    """
    min_time = 0
    max_time = 0
    for listen in listens:
        t = int(listen['listened_at'])
        if not max_time:
            min_time = max_time = t
            continue
        if t > max_time:
            max_time = t
        if t < min_time:
            min_time = t
    return min_time, max_time

def write(self, listen_dicts):
    submit = []
    unique = []

    min_time, max_time = listens_time_range(listen_dicts)

    ...

I don't need to know how listens_time_range is implemented to understand how write works. listens_time_range is also very easy to test now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You left out the part where the user_name is determined for this batch of listens, so now the clean single purpose function needs to be have two purposes or the data needs to be processed a second time.

Copy link
Contributor

@gentlecat gentlecat Apr 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't know if all the listens have user_name in them. If yes, then it can be retrieved from the first one. If not, get from the first one that's encountered.

My snippets are examples that might not work if you just copy them. I haven't tested everything.

DUMP_JSON_WITH_ERRORS = False
ERROR_RETRY_DELAY = 3 # number of seconds to wait until retrying an operation
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this value is just used in only one place. Can it be integrated into other parts with retrying within this module?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@gentlecat
Copy link
Contributor

Some of the issues I pointed out can be detected with Pylint.

@gentlecat gentlecat dismissed their stale review April 26, 2017 10:36

No longer relevant.

@mayhem mayhem merged commit 3c92cde into master Apr 26, 2017
@mayhem mayhem deleted the rabbitmq branch July 6, 2017 15:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants