Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_named_blinker():
sig = blinker.NamedSignal('squiznart')
assert 'squiznart' in repr(sig)
self.isRunning = False
self.runningThread = None
#self._threadLaunched = Semaphore(0)
self.debugger = Debugger(self)
self.debugMode = False
self.namespace = {}
self.initialNamespace = {}
self.initialNames = set()
self.beforeRun = NamedSignal('afterLock')
self.onException = NamedSignal('onException')
self.afterRun = NamedSignal('beforeUnlock')
self.onDebugSet = NamedSignal('onDebugSet')
import logging
import typing
import blinker
import bson
from flask import Blueprint, current_app, request
from werkzeug import exceptions as wz_exceptions
from pillar.api import local_auth
from pillar.api.utils import authorization, authentication
blueprint = Blueprint('service', __name__)
log = logging.getLogger(__name__)
signal_user_changed_role = blinker.NamedSignal('badger:user_changed_role')
ROLES_WITH_GROUPS = {'admin', 'demo', 'subscriber'}
# Map of role name to group ID, for the above groups.
role_to_group_id = {}
class ServiceAccountCreationError(Exception):
"""Raised when a service account cannot be created."""
@blueprint.before_app_first_request
def fetch_role_to_group_id_map():
"""Fills the _role_to_group_id mapping upon application startup."""
global role_to_group_id
def __init__(self, debugger):
self.variablesToTrack = []
self.counter = 0
self.records = deque()
self.recordsCropped = False
debugger.onStart.connect(self._start)
debugger.onFrame.connect(self.recordFrame)
self.onAddedVariable = NamedSignal('onAddedVariable')
self.onRemovedVariable = NamedSignal('onRemovedVariable')
self.onRecorded = NamedSignal('onRecorded')
self.onReset = NamedSignal('onReset')
def __init__(self, interpreter):
"""
Connects the debugger to the interpreter.
"""
super(Debugger, self).__init__()
self.interpreter = interpreter
self.speed = self.DEFAULT_SPEED
self.targetFilenames = set()
self.running = False
self.onSpeedSet = NamedSignal('onSpeedSet')
self.onStart = NamedSignal('onStart')
self.onStop = NamedSignal('onStop')
self.onFrame = NamedSignal('onFrame')
def __init__(self, interpreter):
"""
Connects the debugger to the interpreter.
"""
super(Debugger, self).__init__()
self.interpreter = interpreter
self.speed = self.DEFAULT_SPEED
self.targetFilenames = set()
self.running = False
self.onSpeedSet = NamedSignal('onSpeedSet')
self.onStart = NamedSignal('onStart')
self.onStop = NamedSignal('onStop')
self.onFrame = NamedSignal('onFrame')
from pillar.api import blender_id
from pillar.api.utils import authorization, jsonify
from pillar.auth import current_user
log = logging.getLogger(__name__)
blueprint = Blueprint('blender_cloud.subscription', __name__)
# Mapping from roles on Blender ID to roles here in Pillar.
# Roles not mentioned here will not be synced from Blender ID.
ROLES_BID_TO_PILLAR = {
'cloud_subscriber': 'subscriber',
'cloud_demo': 'demo',
'cloud_has_subscription': 'has_subscription',
}
user_subscription_updated = blinker.NamedSignal(
'user_subscription_updated',
'The sender is a UserClass instance, kwargs includes "revoke_roles" and "grant_roles".')
@blueprint.route('/update-subscription')
@authorization.require_login()
def update_subscription() -> typing.Tuple[str, int]:
"""Updates the subscription status of the current user.
Returns an empty HTTP response.
"""
my_log: logging.Logger = log.getChild('update_subscription')
real_current_user = auth.get_current_user() # multiple accesses, just get unproxied.
try:
def __init__(self, interpreter):
"""
Connects the debugger to the interpreter.
"""
super(Debugger, self).__init__()
self.interpreter = interpreter
self.speed = self.DEFAULT_SPEED
self.targetFilenames = set()
self.running = False
self.onSpeedSet = NamedSignal('onSpeedSet')
self.onStart = NamedSignal('onStart')
self.onStop = NamedSignal('onStop')
self.onFrame = NamedSignal('onFrame')
from logging import getLogger
import blinker
logger = getLogger(__name__)
__all__ = [
'job_started', 'job_finished', 'job_schedule_retry', 'job_failed',
'worker_started', 'worker_terminated'
]
class SafeNamedSignal(blinker.NamedSignal):
"""Named signal for misbehaving receivers."""
def send(self, *sender, **kwargs):
"""Emit this signal on behalf of `sender`, passing on kwargs.
This is an extension of `Signal.send` that changes one thing:
Exceptions raised in calling the receiver are logged but do not fail
"""
if len(sender) == 0:
sender = None
elif len(sender) > 1:
raise TypeError('send() accepts only one positional argument, '
'%s given' % len(sender))
else:
sender = sender[0]