Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import asyncio
from aiozk import exc, WatchEvent
from .recipe import Recipe
class Barrier(Recipe):
def __init__(self, path):
super().__init__()
self.path = path
async def create(self):
await self.ensure_path()
await self.create_znode(self.path)
async def lift(self):
try:
await self.client.delete(self.path)
except exc.NoNode:
pass
async def wait(self, timeout=None):
import asyncio
import logging
import re
import uuid
from aiozk import exc, WatchEvent, RetryPolicy, states
from .recipe import Recipe
log = logging.getLogger(__name__)
sequential_re = re.compile(r'.*[0-9]{10}$')
class SequentialRecipe(Recipe):
def __init__(self, base_path):
super().__init__(base_path)
self.guid = uuid.uuid4().hex
self.owned_paths = {}
def sequence_number(self, sibling):
return int(sibling[-10:])
def determine_znode_label(self, sibling):
return sibling.rsplit("-", 2)[0]
def sibling_path(self, path):
return "/".join([self.base_path, path])
import inspect
import logging
from aiozk import WatchEvent, exc
from .recipe import Recipe
log = logging.getLogger(__name__)
def maybe_future(fut, loop):
if inspect.isawaitable(fut):
asyncio.ensure_future(fut, loop=loop)
class BaseWatcher(Recipe):
watched_events = []
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.callbacks = collections.defaultdict(set)
self.loops = {}
def add_callback(self, path, callback):
self.callbacks[path].add(callback)
if path not in self.loops:
self.loops[path] = asyncio.ensure_future(self.watch_loop(path), loop=self.client.loop)
def remove_callback(self, path, callback):
self.callbacks[path].discard(callback)
def gather_installed_classes(self):
for module, name in RECIPES.items():
recipe_class = getattr(import_module('aiozk.recipes.{}'.format(module)), name)
if not issubclass(recipe_class, Recipe):
log.error("Could not load recipe %s: not a Recipe subclass", recipe_class.__name__)
continue
if not recipe_class.validate_dependencies():
log.error("Could not load recipe %s has unmet dependencies", recipe_class.__name__)
continue
log.debug("Loaded recipe %s", recipe_class.__name__)
self.installed_classes[recipe_class.__name__] = recipe_class
import logging
from aiozk import exc
from .recipe import Recipe
log = logging.getLogger(__name__)
class Counter(Recipe):
def __init__(self, base_path, default=0):
super().__init__(base_path)
self.value = None
self.numeric_type = type(default)
self._default = default
self._version = 0
async def _fetch(self):
data, stat = await self.client.get(self.base_path)
self._version = stat.version
self.value = self.numeric_type(data)
return (self.value, stat.version)
async def start(self):
import asyncio
import collections
import itertools
import json
from .data_watcher import DataWatcher
from .party import Party
from .lock import Lock
from .recipe import Recipe
class Allocator(Recipe):
sub_recipes = {
"party": (Party, ["member_path", "name"]),
"lock": (Lock, ["lock_path"]),
"data_watcher": DataWatcher,
}
def __init__(self, base_path, name, allocator_fn=None):
self.name = name
super().__init__(base_path)
if allocator_fn is None:
allocator_fn = round_robin
self.allocator_fn = allocator_fn
import asyncio
import logging
from .children_watcher import ChildrenWatcher
from .data_watcher import DataWatcher
from .recipe import Recipe
from ..exc import NoNode
log = logging.getLogger(__name__)
class TreeCache(Recipe):
sub_recipes = {
"data_watcher": DataWatcher,
"child_watcher": ChildrenWatcher,
}
def __init__(self, base_path, defaults=None):
super().__init__(base_path)
self.defaults = defaults or {}
self.root = None
async def start(self):
log.debug("Starting znode tree cache at %s", self.base_path)
self.root = ZNodeCache(