Skip to content

Instantly share code, notes, and snippets.

@kromg
Created June 7, 2018 14:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kromg/1d9173195807c798192ec32892100d0b to your computer and use it in GitHub Desktop.
Save kromg/1d9173195807c798192ec32892100d0b to your computer and use it in GitHub Desktop.
#circuits test_notify.py recursive test
#!/usr/bin/env python
import pytest
import os
from circuits import Component, handler
try:
from circuits.io.notify import Notify
except ImportError:
pytest.importorskip("pyinotify")
class App(Component):
def init(self, *args, **kwargs):
self.created = []
@handler('created', channel='notify')
def created(self, *args, **kwargs):
self.created.append(args[0].strip())
def _create(watcher, tmpdir, *target):
target = os.path.join(*target)
tmpdir.ensure(target)
watcher.wait("created")
def test_notify_non_recursive(manager, watcher, tmpdir):
app = App().register(manager)
notify = Notify().register(app)
watcher.wait("registered")
subdir = "hellosubdir"
tmpdir.ensure(subdir, dir=True)
# File creation in watched path
notify.add_path(str(tmpdir))
target = "helloworld.txt"
_create(watcher, tmpdir, target)
assert target in app.created
# File creation in watched subpath - non recursive
subdir_file = "helloworld_sub.txt"
_create(watcher, tmpdir, subdir, subdir_file)
assert subdir_file not in app.created
# File creation in non-watched path
notify.remove_path(str(tmpdir))
target = "helloworld2.txt"
_create(watcher, tmpdir, target)
assert target not in app.created
# File creation in non-watched subpath
subdir_file = "helloworld_sub2.txt"
_create(watcher, tmpdir, subdir, subdir_file)
assert subdir_file not in app.created
# Cleanup
app.unregister()
watcher.wait("unregistered")
def test_notify_recursive(manager, watcher, tmpdir):
app = App().register(manager)
notify = Notify().register(app)
watcher.wait("registered")
subdir = "hellosubdir"
tmpdir.ensure(subdir, dir=True)
# File creation in watched path - recursive w/o auto-add
notify.add_path(str(tmpdir), recursive=True, auto_add=False)
target = "helloworld.txt"
_create(watcher, tmpdir, target)
assert target in app.created
# File creation in watched subpath - recursive w/o auto-add
subdir_file = "helloworld_sub.txt"
_create(watcher, tmpdir, subdir, subdir_file)
assert subdir_file in app.created
# File/path creation in watched path - recursive w/o auto-add
subdir = "hellosubdir2"
_create(watcher, tmpdir, subdir)
assert subdir in app.created
##### RANDOMLY:
# E AssertionError: assert 'hellosubdir2' in ['helloworld.txt', 'helloworld_sub.txt', 'hellosubdir2']
# E + where ['helloworld.txt', 'helloworld_sub.txt', 'hellosubdir2'] = <App/* 26612:MainThread (queued=0) [S]>.created
subdir_file = "helloworld_sub2.txt"
_create(watcher, tmpdir, subdir, subdir_file)
assert subdir_file not in app.created
# File creation in non-watched path
notify.remove_path(str(tmpdir))
target = "helloworld2.txt"
_create(watcher, tmpdir, target)
assert target not in app.created
# File creation in non-watched subpath
subdir_file = "helloworld_sub3.txt"
_create(watcher, tmpdir, subdir, subdir_file)
assert subdir_file not in app.created
# Cleanup
app.unregister()
watcher.wait("unregistered")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment