Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@kopf.on.create('group', 'version', 'plural')
def fn(**_):
pass
def test_resources():
registry = GlobalRegistry()
registry.register_cause_handler('group1', 'version1', 'plural1', some_fn)
registry.register_cause_handler('group2', 'version2', 'plural2', some_fn)
resources = registry.resources
assert isinstance(resources, collections.abc.Collection)
assert len(resources) == 2
resource1 = Resource('group1', 'version1', 'plural1')
resource2 = Resource('group2', 'version2', 'plural2')
assert resource1 in resources
assert resource2 in resources
def test_url_of_builtin_resource_item_namespaced():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(namespace='ns-a.b', name='name-a.b')
assert url == '/api/v1/namespaces/ns-a.b/plural/name-a.b'
def test_url_of_builtin_resource_item_cluster_scoped():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(name='name-a.b')
assert url == '/api/v1/plural/name-a.b'
def test_creation_with_all_args():
resource = Resource(
'group',
'version',
'plural',
)
assert resource.group == 'group'
assert resource.version == 'version'
assert resource.plural == 'plural'
def test_name_of_builtin_resource():
resource = Resource('', 'v1', 'plural')
name = resource.name
assert name == 'plural'
def test_on_update_with_all_kwargs(mocker):
registry = OperatorRegistry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.UPDATE)
mocker.patch('kopf.reactor.registries.match', return_value=True)
@kopf.on.update('group', 'version', 'plural',
id='id', registry=registry,
errors=ErrorsMode.PERMANENT, timeout=123, retries=456, backoff=78,
labels={'somelabel': 'somevalue'},
annotations={'someanno': 'somevalue'})
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.UPDATE
assert handlers[0].field is None
def test_url_of_builtin_resource_list_namespaced():
resource = Resource('', 'v1', 'plural')
url = resource.get_url(namespace='ns-a.b')
assert url == '/api/v1/namespaces/ns-a.b/plural'
def test_on_update_minimal(mocker):
registry = kopf.get_default_registry()
resource = Resource('group', 'version', 'plural')
cause = mocker.MagicMock(resource=resource, reason=Reason.UPDATE)
@kopf.on.update('group', 'version', 'plural')
def fn(**_):
pass
handlers = registry.get_resource_changing_handlers(cause)
assert len(handlers) == 1
assert handlers[0].fn is fn
assert handlers[0].reason == Reason.UPDATE
assert handlers[0].field is None
assert handlers[0].errors is None
assert handlers[0].timeout is None
assert handlers[0].retries is None
assert handlers[0].backoff is None
assert handlers[0].cooldown is None # deprecated alias
@kopf.on.delete('group', 'version', 'plural')
def fn(**_):
pass