Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# Inject the events of unique objects - to produce few streams/workers.
stream.feed(events)
stream.close()
# Run the watcher (near-instantly and test-blocking).
with timer:
await watcher(
namespace=None,
resource=resource,
handler=handler,
)
# Significantly less than the queue getting timeout, but sufficient to run.
# 2 <= 1 pull for the event chain + 1 pull for EOS. TODO: 1x must be enough.
from kopf import config
assert timer.seconds < config.WorkersConfig.worker_batch_window + CODE_OVERHEAD
# Was the handler called at all? Awaited as needed for async fns?
assert handler.awaited
# Was it called only once per uid? Only with the latest event?
# Note: the calls can be in arbitrary order, not as we expect then.
assert handler.call_count == len(uids)
assert handler.call_count == len(vals)
expected_uid_val_pairs = set(zip(uids, vals))
actual_uid_val_pairs = set((
kwargs['event']['object']['metadata']['uid'],
kwargs['event']['object']['spec'])
for args, kwargs in handler.call_args_list)
assert actual_uid_val_pairs == expected_uid_val_pairs
while worker_spy.call_count < unique:
await asyncio.sleep(0.001) # give control to the loop
streams = worker_spy.call_args_list[-1][1]['streams']
# The mutable(!) streams dict is now populated with the objects' streams.
assert len(streams) != 0 # usually 1, but can be 2+ if it is fast enough.
# Weakly remember the stream's content to make sure it is gc'ed later.
# Note: namedtuples are not referable due to __slots__/__weakref__ issues.
refs = [weakref.ref(val) for wstream in streams.values() for val in wstream]
assert all([ref() is not None for ref in refs])
# Give the workers some time to finish waiting for the events.
# Once the idle timeout, they will exit and gc their individual streams.
from kopf import config
await asyncio.sleep(config.WorkersConfig.worker_batch_window) # depleting the queues.
await asyncio.sleep(config.WorkersConfig.worker_idle_timeout) # idling on empty queues.
await asyncio.sleep(CODE_OVERHEAD)
# The mutable(!) streams dict is now empty, i.e. garbage-collected.
assert len(streams) == 0
# Truly garbage-collected? Memory freed?
assert all([ref() is None for ref in refs])
'reportingComponent': 'kopf',
'reportingInstance': 'dev',
'source' : {'component': 'kopf'}, # used in the "From" column in `kubectl describe`.
'involvedObject': full_ref,
'firstTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' -- seen in `kubectl describe ...`
'lastTimestamp': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z' - seen in `kubectl get events`
'eventTime': now.isoformat() + 'Z', # '2019-01-28T18:25:03.000000Z'
}
try:
obj = pykube.Event(api, body)
loop = asyncio.get_running_loop()
await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), obj.create)
except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError) as e:
# Events are helpful but auxiliary, they should not fail the handling cycle.
# Yet we want to notice that something went wrong (in logs).
logger.warning("Failed to post an event. Ignoring and continuing. "
f"Error: {e!r}. "
"""
# First, list the resources regularly, and get the list's resource version.
# Simulate the events with type "None" event - used in detection of causes.
items, resource_version = await fetching.list_objs_rv(resource=resource, namespace=namespace)
for item in items:
yield {'type': None, 'object': item}
# Repeat through disconnects of the watch as long as the resource version is valid (no errors).
# The individual watching API calls are disconnected by timeout even if the stream is fine.
while True:
# Then, watch the resources starting from the list's resource version.
stream = watch_objs(
resource=resource, namespace=namespace,
timeout=config.WatchersConfig.default_stream_timeout,
since=resource_version,
)
async for event in stream:
# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (5, as per the official doc).
# The error occurs when there is nothing happening for few minutes. This is normal.
if event['type'] == 'ERROR' and cast(bodies.Error, event['object'])['code'] == 410:
logger.debug("Restarting the watch-stream for %r", resource)
return # out of regular stream, to the infinite stream.
# Other watch errors should be fatal for the operator.
if event['type'] == 'ERROR':
raise WatchingError(f"Error in the watch-stream: {event['object']}")
# Ensure that the event is something we understand and can handle.
def filter(self, record: logging.LogRecord) -> bool:
# Only those which have a k8s object referred (see: `ObjectLogger`).
# Otherwise, we have nothing to post, and nothing to do.
level_ok = record.levelno >= config.EventsConfig.events_loglevel
has_ref = hasattr(record, 'k8s_ref')
skipped = hasattr(record, 'k8s_skip') and getattr(record, 'k8s_skip')
return level_ok and has_ref and not skipped and super().filter(record)
async def info_async(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_INFO:
return
await event_async(obj, reason=reason, message=message, type='Normal')
async def exception_async(obj, *, reason='', message='', exc=None):
if config.EventsConfig.events_loglevel > config.LOGLEVEL_ERROR:
return
if exc is None:
_, exc, _ = sys.exc_info()
reason = reason if reason else type(exc).__name__
message = f'{message} {exc}' if message else f'{exc}'
await event_async(obj, reason=reason, message=message, type='Error')
async def _make_cls(
resource: resources.Resource,
) -> Type[pykube.objects.APIObject]:
loop = asyncio.get_running_loop()
api = auth.get_pykube_api()
fn = functools.partial(api.resource_list, resource.api_version)
rsp = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), fn)
api_resources = rsp['resources']
resource_kind = next((r['kind'] for r in api_resources if r['name'] == resource.plural), None)
is_namespaced = next((r['namespaced'] for r in api_resources if r['name'] == resource.plural), None)
if not resource_kind:
raise pykube.ObjectDoesNotExist(f"No such CRD: {resource.name}")
cls_name = resource.plural
cls_base = pykube.objects.NamespacedAPIObject if is_namespaced else pykube.objects.APIObject
cls = type(cls_name, (cls_base,), {
'version': resource.api_version,
'endpoint': resource.plural,
'kind': resource_kind,
})
return cls
def info(obj, *, reason, message=''):
if config.EventsConfig.events_loglevel <= config.LOGLEVEL_INFO:
event(obj, type='Normal', reason=reason, message=message)
if is_async_fn(fn):
result = await fn(*args, **kwargs) # type: ignore
else:
# Not that we want to use functools, but for executors kwargs, it is officially recommended:
# https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.run_in_executor
real_fn = functools.partial(fn, *args, **kwargs)
# Copy the asyncio context from current thread to the handlr's thread.
# It can be copied 2+ times if there are sub-sub-handlers (rare case).
context = contextvars.copy_context()
real_fn = functools.partial(context.run, real_fn)
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(config.WorkersConfig.get_syn_executor(), real_fn)
return result