Skip to content

Commit

Permalink
Fixed #198 -- Made pushing onto the event queue via fire threadsafe.
Browse files Browse the repository at this point in the history
  • Loading branch information
apollo13 committed Jan 26, 2017
1 parent 25442fc commit 7443fb9
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 24 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Expand Up @@ -11,8 +11,6 @@ python:

matrix:
allow_failures:
# Till https://travis-ci.org/circuits/circuits/jobs/192188096 is solved.
- python: pypy
# Till https://travis-ci.org/circuits/circuits/jobs/192669838 is solved.
- os: osx
include:
Expand Down
68 changes: 46 additions & 22 deletions circuits/core/manager.py
Expand Up @@ -2,6 +2,7 @@
This module defines the Manager class.
"""
import atexit
from collections import deque
from heapq import heappop, heappush
from inspect import isfunction
from itertools import chain, count
Expand Down Expand Up @@ -126,6 +127,43 @@ def __init__(self, timeout):
self.tick_handler = None


class _EventQueue(object):
__slots__ = ('_queue', '_priority_queue', '_counter', '_flush_batch')

def __init__(self):
self._queue = deque()
self._priority_queue = []
self._counter = count()
self._flush_batch = 0

def __len__(self):
return len(self._queue) + len(self._priority_queue)

def drainFrom(self, other_queue):
self._queue.extend(other_queue._queue)
other_queue._queue.clear()
# Queue is currently flushing events /o\
assert not len(other_queue._priority_queue)

def append(self, event, channel, priority):
self._queue.append((priority, next(self._counter), (event, channel)))

def dispatchEvents(self, dispatcher):
if self._flush_batch == 0:
# FIXME: Might be faster to use heapify instead of pop +
# heappush. Though, with regards to thread safety this
# appears to be the better approach.
self._flush_batch = count = len(self._queue)
while count:
count -= 1
heappush(self._priority_queue, self._queue.popleft())

while self._flush_batch > 0:
self._flush_batch -= 1 # Decrement first!
(event, channels) = heappop(self._priority_queue)[2]
dispatcher(event, channels, self._flush_batch)


class Manager(object):

"""
Expand Down Expand Up @@ -184,8 +222,7 @@ class Manager(object):
def __init__(self, *args, **kwargs):
"initializes x; see x.__class__.__doc__ for signature"

self._queue = []
self._counter = count()
self._queue = _EventQueue()

self._tasks = set()
self._cache = dict()
Expand Down Expand Up @@ -398,8 +435,7 @@ def registerChild(self, component):
self.root._executing_thread = component._executing_thread
component._executing_thread = None
self.components.add(component)
self.root._queue.extend(list(component._queue))
component._queue = []
self.root._queue.drainFrom(component._queue)
self.root._cache_needs_refresh = True

def unregisterChild(self, component):
Expand All @@ -419,14 +455,7 @@ def _fire(self, event, channel, priority=0):
event.effects = 1
self._currently_handling.effects += 1

heappush(
self._queue,
(
priority,
next(self._counter),
(event, channel)
)
)
self._queue.append(event, channel, priority)

# the event comes from another thread
else:
Expand All @@ -444,8 +473,7 @@ def _fire(self, event, channel, priority=0):
# operations that assume its value to remain unchanged.
handling = self._currently_handling

heappush(self._queue,
(priority, next(self._counter), (event, channel)))
self._queue.append(event, channel, priority)
if isinstance(handling, generate_events):
handling.reduce_time_left(0)

Expand Down Expand Up @@ -569,12 +597,7 @@ def _flush(self):
old_flushing = self._flushing_thread
try:
self._flushing_thread = current_thread()
if self._flush_batch == 0:
self._flush_batch = len(self._queue)
while self._flush_batch > 0:
self._flush_batch -= 1 # Decrement first!
priority, count, (event, channels) = heappop(self._queue)
self._dispatcher(event, channels, self._flush_batch)
self._queue.dispatchEvents(self._dispatcher)
finally:
self._flushing_thread = old_flushing

Expand Down Expand Up @@ -909,7 +932,8 @@ def tick(self, timeout=-1):
if self._running:
self.fire(generate_events(self._lock, timeout), "*")

self._queue and self.flush()
if len(self._queue):
self.flush()

def run(self, socket=None):
"""
Expand Down Expand Up @@ -948,7 +972,7 @@ def run(self, socket=None):
self.fire(started(self))

try:
while self.running or self._queue:
while self.running or len(self._queue):
self.tick()
# Fading out, handle remaining work from stop event
for _ in range(3):
Expand Down

0 comments on commit 7443fb9

Please sign in to comment.