Skip to content

Commit

Permalink
Merge pull request #1153 from rackerlabs/zk-intents
Browse files Browse the repository at this point in the history
Add intents for certain interactions with ZooKeeper
  • Loading branch information
radix committed Mar 10, 2015
2 parents c8ffdc8 + c22f450 commit f513e72
Show file tree
Hide file tree
Showing 3 changed files with 333 additions and 0 deletions.
195 changes: 195 additions & 0 deletions otter/test/util/test_zk.py
@@ -0,0 +1,195 @@
"""Tests for otter.util.zk"""

from functools import partial

from characteristic import attributes

from effect import Effect, TypeDispatcher
from effect.twisted import perform

from kazoo.exceptions import BadVersionError, NoNodeError, NodeExistsError

from twisted.internet.defer import fail, succeed
from twisted.trial.unittest import SynchronousTestCase

from otter.util.zk import (
CreateOrSet, CreateOrSetLoopLimitReachedError,
DeleteNode, GetChildrenWithStats,
perform_create_or_set, perform_delete_node,
perform_get_children_with_stats)


@attributes(['version'])
class ZNodeStatStub(object):
"""Like a :obj:`ZnodeStat`, but only supporting the data we need."""


class ZKCrudModel(object):
"""
A simplified model of Kazoo's CRUD operations, supporting
version-check-and-set.
To facilitate testing tricky concurrent scenarios, a system of 'post-hooks'
is provided, which allows calling an arbitrary function immediately after
some operations take effect.
"""
def __init__(self):
self.nodes = {}

def create(self, path, content, makepath=False):
"""Create a node."""
assert makepath is True, "makepath must be True"
if path in self.nodes:
return fail(NodeExistsError("{} already exists".format(path)))
self.nodes[path] = (content, 0)
return succeed(path)

def get(self, path):
"""Get content of the node, and stat info."""
if path not in self.nodes:
return fail(NoNodeError("{} does not exist".format(path)))
content, version = self.nodes[path]
return succeed((content, ZNodeStatStub(version=version)))

def _check_version(self, path, version):
if path not in self.nodes:
return fail(NoNodeError("{} does not exist".format(path)))
if version != -1:
current_version = self.nodes[path][1]
if current_version != version:
return fail(BadVersionError(
"When operating on {}, version {} was specified by "
"version {} was found".format(path, version,
current_version)))

def set(self, path, new_value, version=-1):
"""Set the content of a node."""
check = self._check_version(path, version)
if check is not None:
return check
current_version = self.nodes[path][1]
new_stat = ZNodeStatStub(version=current_version + 1)
self.nodes[path] = (new_value, new_stat.version)
return succeed(new_stat)

def delete(self, path, version=-1):
"""Delete a node."""
check = self._check_version(path, version)
if check is not None:
return check
del self.nodes[path]
return succeed(None)


class CreateOrSetTests(SynchronousTestCase):
"""Tests for :func:`create_or_set`."""
def setUp(self):
self.model = ZKCrudModel()

def _cos(self, path, content):
eff = Effect(CreateOrSet(path=path, content=content))
performer = partial(perform_create_or_set, self.model)
dispatcher = TypeDispatcher({CreateOrSet: performer})
return perform(dispatcher, eff)

def test_create(self):
"""Creates a node when it doesn't exist."""
d = self._cos('/foo', 'bar')
self.assertEqual(self.successResultOf(d), '/foo')
self.assertEqual(self.model.nodes, {'/foo': ('bar', 0)})

def test_update(self):
"""Uses `set` to update the node when it does exist."""
self.model.create('/foo', 'initial', makepath=True)
d = self._cos('/foo', 'bar')
self.assertEqual(self.successResultOf(d), '/foo')
self.assertEqual(self.model.nodes, {'/foo': ('bar', 1)})

def test_node_disappears_during_update(self):
"""
If `set` can't find the node (because it was unexpectedly deleted
between the `create` and `set` calls), creation will be retried.
"""
def hacked_set(path, value):
self.model.delete('/foo')
del self.model.set # Only let this behavior run once
return self.model.set(path, value)
self.model.set = hacked_set

self.model.create('/foo', 'initial', makepath=True)
d = self._cos('/foo', 'bar')
self.assertEqual(self.successResultOf(d), '/foo')
# It must be at version 0 because it's a creation, whereas normally if
# the node were being updated it'd be at version 1.
self.assertEqual(self.model.nodes, {'/foo': ('bar', 0)})

def test_loop_limit(self):
"""
performing a :obj:`CreateOrSet` will avoid infinitely looping in
pathological cases, and eventually blow up with a
:obj:`CreateOrSetLoopLimitReachedError`.
"""
def hacked_set(path, value):
return fail(NoNodeError())

def hacked_create(path, content, makepath):
return fail(NodeExistsError())

self.model.set = hacked_set
self.model.create = hacked_create

d = self._cos('/foo', 'bar')
failure = self.failureResultOf(d)
self.assertEqual(failure.type, CreateOrSetLoopLimitReachedError)
self.assertEqual(failure.getErrorMessage(), '/foo')


class GetChildrenWithStatsTests(SynchronousTestCase):
"""Tests for :func:`get_children_with_stats`."""
def setUp(self):
# It'd be nice if we used the standard ZK CRUD model, but implementing
# a tree of nodes supporting get_children is a pain
class Model(object):
pass
self.model = Model()

def _gcws(self, path):
eff = Effect(GetChildrenWithStats(path))
performer = partial(perform_get_children_with_stats, self.model)
dispatcher = TypeDispatcher({GetChildrenWithStats: performer})
return perform(dispatcher, eff)

def test_get_children_with_stats(self):
"""
get_children_with_stats returns path of all children along with their
ZnodeStat objects. Any children that disappear between ``get_children``
and ``exists`` are not returned.
"""
def exists(p):
if p == '/path/foo':
return succeed(ZNodeStatStub(version=0))
if p == '/path/bar':
return succeed(ZNodeStatStub(version=1))
if p == '/path/baz':
return succeed(None)
self.model.get_children = {'/path': succeed(['foo', 'bar', 'baz'])}.get
self.model.exists = exists

d = self._gcws('/path')
self.assertEqual(self.successResultOf(d),
[('foo', ZNodeStatStub(version=0)),
('bar', ZNodeStatStub(version=1))])


class DeleteTests(SynchronousTestCase):
"""Tests for :obj:`DeleteNode`."""
def test_delete(self):
model = ZKCrudModel()
eff = Effect(DeleteNode(path='/foo', version=1))
model.create('/foo', 'initial', makepath=True)
model.set('/foo', 'bar')
performer = partial(perform_delete_node, model)
dispatcher = TypeDispatcher({DeleteNode: performer})
d = perform(dispatcher, eff)
self.assertEqual(model.nodes, {})
self.assertEqual(self.successResultOf(d), None)
13 changes: 13 additions & 0 deletions otter/util/deferredutils.py
Expand Up @@ -318,3 +318,16 @@ def wrapped_f(*args, **kwargs):
return wrapped_f

return decorator


def catch_failure(exc_type, fn, *args, **kwargs):
"""
Returns an errback which will call ``fn(failure, *args, **kwargs)`` only
after ensuring that a failure wraps the specified exception type.
Use like d.addErrback(catch_failure(ExampleError, lambda: 'foo'))
"""
def handler(f):
f.trap(exc_type)
return fn(f, *args, **kwargs)
return handler
125 changes: 125 additions & 0 deletions otter/util/zk.py
@@ -0,0 +1,125 @@
from functools import partial

from characteristic import attributes

from effect import TypeDispatcher
from effect.twisted import deferred_performer

from kazoo.exceptions import NoNodeError, NodeExistsError

from twisted.internet.defer import gatherResults

from otter.util.deferredutils import catch_failure


CREATE_OR_SET_LOOP_LIMIT = 50
"""
A limit on the number of times we'll jump between trying to create a node
vs trying to set a node's contents in perform_create_or_set.
"""


@attributes(['path', 'content'])
class CreateOrSet(object):
"""
Create a node, or if the node already exists, set the content.
Handles the case where a node gets deleted in between our attempt and
creating and setting.
"""


class CreateOrSetLoopLimitReachedError(Exception):
"""
Raised when the number of times trying to create a node in
:func:`perform_create_or_set` has gone over
:obj:`CREATE_OR_SET_LOOP_LIMIT`.
"""


@deferred_performer
def perform_create_or_set(kz_client, dispatcher, create_or_set):
"""
Performer for :obj:`CreateOrSet`. Must be partialed with ``kz_client``.
"""
path = create_or_set.path
content = create_or_set.content

def create(count):
if count >= CREATE_OR_SET_LOOP_LIMIT:
raise CreateOrSetLoopLimitReachedError(path)
d = kz_client.create(path, content, makepath=True)
d.addErrback(catch_failure(NodeExistsError,
lambda f: set_content(count)))
return d

def set_content(count):
d = kz_client.set(path, content)
d.addErrback(catch_failure(NoNodeError,
lambda f: create(count + 1)))
return d.addCallback(lambda r: path)

return create(0)


@attributes(['path'], apply_with_init=False)
class GetChildrenWithStats(object):
"""
List children along with their stat information.
Results in ``[(child_path, :obj:`ZnodeStat`)]``.
"""
def __init__(self, path):
self.path = path


@deferred_performer
def perform_get_children_with_stats(kz_client, dispatcher, intent):
"""
Perform :obj:`GetChildrenWithStats`. Must be partialed with ``kz_client``.
:param kz_client: txKazoo client
:param dispatcher: dispatcher, supplied by perform
:param GetChildrenWithStats intent: the intent
"""
path = intent.path
children = kz_client.get_children(path)

def got_children(children):
ds = [
kz_client.exists(path + '/' + child).addCallback(
lambda r, child=child: (child, r) if r is not None else None)
for child in children
]
return gatherResults(ds)
children.addCallback(got_children)
children.addCallback(partial(filter, None))
return children


@attributes(['path', 'version'])
class DeleteNode(object):
"""Delete a node."""


@deferred_performer
def perform_delete_node(kz_client, dispatcher, intent):
"""Perform :obj:`DeleteNode`.
:param kz_client: txKazoo client
:param dispatcher: dispatcher, supplied by perform
:param DeleteNode intent: the intent
"""
kz_client.delete(intent.path, version=intent.version)


def get_zk_dispatcher(kz_client):
"""Get a dispatcher that can support all of the ZooKeeper intents."""
return TypeDispatcher({
CreateOrSet:
partial(perform_create_or_set, kz_client),
DeleteNode:
partial(perform_delete_node, kz_client),
GetChildrenWithStats:
partial(perform_get_children_with_stats, kz_client),
})

0 comments on commit f513e72

Please sign in to comment.