Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_literal_dict_becomes_a_pmap():
x = {'a': 1}
assert type(x) is type(m())
assert x == m(a=1)
def test_function_dict_becomes_a_dict():
x = dict()
assert type(x) is not type(m())
def test_literal_dict_with_function_call_becomes_a_pvector():
x = {'a': 1}.set('b', 2)
assert type(x) is type(m())
assert x == m(a=1, b=2)
self,
downstream_executor,
handler=standard_handler,
format_string=DEFAULT_FORMAT,
):
self.downstream_executor = downstream_executor
self.TASK_CONFIG_INTERFACE = downstream_executor.TASK_CONFIG_INTERFACE
self.handler = handler
self.format_string = format_string
self.src_queue = downstream_executor.get_event_queue()
self.dest_queue = Queue()
self.stopping = False
self.staging_tasks = m()
self.running_tasks = m()
self.done_tasks = v()
# A lock is needed to synchronize logging and event processing
self.task_lock = Lock()
self.event_thread = Thread(target=self.event_loop)
self.event_thread.daemon = True
self.event_thread.start()
self.logging_thread = Thread(target=self.logging_loop)
self.logging_thread.daemon = True
self.logging_thread.start()
def compute_check_result_for_job(client, job, url_index):
kwargs = m(
name=f"check_tron_job.{job['name']}",
source=client.cluster_name,
)
if 'realert_every' not in kwargs:
kwargs = kwargs.set('realert_every', guess_realert_every(job))
kwargs = kwargs.set('check_every', f"{_run_interval}s")
# We want to prevent a monitoring config from setting the check_every
# attribute, since one config should not dictate how often this script runs
sensu_kwargs = (
pmap(job['monitoring']).discard(PRECIOUS_JOB_ATTR)
.discard('check_every')
.discard('page_for_expected_runtime')
)
kwargs = kwargs.update(sensu_kwargs)
hide_stderr = kwargs.get('hide_stderr', False)
def enqueue_task(self, task_config):
with self._lock:
# task_state and task_state_history get reset every time
# a task is enqueued.
self.task_metadata = self.task_metadata.set(
task_config.task_id,
TaskMetadata(
task_config=task_config,
task_state='TASK_INITED',
task_state_history=m(TASK_INITED=time.time()),
)
)
# Need to lock on task_queue to prevent enqueues when getting
# tasks to launch
self.task_queue.put(task_config)
if self.are_offers_suppressed:
if self.call_driver('reviveOffers') is not self.driver_error:
self.are_offers_suppressed = False
log.info('Reviving offers because we have tasks to run.')
get_metric(metrics.TASK_ENQUEUED_COUNT).count(1)
"""
A performance benchmark using the example from issue #232.
See https://github.com/Julian/jsonschema/pull/232.
"""
from twisted.python.filepath import FilePath
from pyperf import Runner
from pyrsistent import m
from jsonschema.tests._suite import Version
import jsonschema
issue232 = Version(
path=FilePath(__file__).sibling("issue232"),
remotes=m(),
name="issue232",
)
if __name__ == "__main__":
issue232.benchmark(
runner=Runner(),
Validator=jsonschema.Draft4Validator,
)
# required
name = field(type=str, mandatory=True, invariant=inv_name_identifier)
node = field(type=str, mandatory=True, invariant=inv_identifier)
schedule = field(
mandatory=True,
factory=schedule_parse.ConfigGenericSchedule.from_config
)
actions = field(
type=ActionMap,
mandatory=True,
invariant=inv_actions,
factory=ActionMap.from_config
)
namespace = field(type=str, mandatory=True)
monitoring = field(type=PMap, initial=m(), factory=pmap)
queueing = field(type=bool, initial=True)
run_limit = field(type=int, initial=50)
all_nodes = field(type=bool, initial=False)
cleanup_action = field(
type=(Action, type(None)), initial=None, factory=Action.from_config
)
enabled = field(type=bool, initial=True)
allow_overlap = field(type=bool, initial=False)
max_runtime = field(
type=(datetime.timedelta, type(None)),
factory=config_utils.valid_time_delta,
initial=None
)
time_zone = field(
type=(datetime.tzinfo, type(None)),
initial=None,
def load_plugin(self, provider_module):
module = importlib.import_module(provider_module)
plugin_name = module.TASK_PROCESSING_PLUGIN
name_update = Registry(plugin_modules=m().set(plugin_name, module))
plugin_update = module.register_plugin(Registry())
def conflict_check(old, new):
# Any PMap in the registry ought have no duplicate keys,
# if they do that means two plugins tried to define the same
# executor or something
if type(old) == PMap:
conflicts = set(old.keys()) & set(new.keys())
if conflicts:
raise ValueError(
'{0} is trying to register elements that already '
'exist in the registry: {1}'.format(
plugin_name, conflicts
)
)
return old.update(new)