Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, **kw):
super(ThreadProfile, self).__init__(**kw)
self._local_trace_backup = self._local_trace
def _enable(self):
self._local_trace = self._local_trace_backup
threading.settrace(self._global_trace)
super(ThreadProfile, self)._enable()
def _disable(self):
super(ThreadProfile, self)._disable()
threading.settrace(None)
self._local_trace = None
class StatisticProfile(ProfileBase, ProfileRunnerBase):
"""
Statistic profiling class.
This class does not gather its own samples by itself.
Instead, it must be provided with call stacks (as returned by
sys._getframe() or sys._current_frames()).
"""
def __init__(self):
super(StatisticProfile, self).__init__()
self.total_time = 1
def sample(self, frame):
getFileTiming = self._getFileTiming
called_timing = getFileTiming(frame)
called_code = frame.f_code
called_timing.hit(called_code, frame.f_lineno, 0)
def _run(threads, verbose, func_name, filename, *args, **kw):
if threads:
klass = ThreadProfile
else:
klass = Profile
prof = klass(verbose=verbose)
try:
try:
getattr(prof, func_name)(*args, **kw)
except SystemExit:
pass
finally:
if filename is None:
prof.print_stats()
else:
prof.dump_stats(filename)
def __profile_file(self):
"""Method used to profile the given file line by line."""
self.line_profiler = pprofile.Profile()
self.line_profiler.runfile(
open(self.pyfile.path, "r"), {}, self.pyfile.path
)
def start_profiling(func, filepath, statistical=True):
import pprofile
if statistical:
prof = pprofile.StatisticalProfile()
else:
prof = pprofile.Profile()
def stop_profiling(prof, filepath):
print('Writing profiling data: %s' % filepath, file=sys.stderr)
print('You can use kcachegrind to analyze it.', file=sys.stderr)
with open(filepath, 'w') as f:
prof.callgrind(f)
# This makes the `finally` block work as expected if we're terminated by
# SIGTERM, which happens by default when running `timeout 30 stig ...`.
# https://stackoverflow.com/a/42200623
# https://mail.python.org/pipermail/python-ideas/2016-February/038474.html
import signal
class SigTerm(SystemExit): pass
def sigterm(sig, frame): raise SigTerm
signal.signal(15, sigterm)
for i, batch in enumerate(dataloader_iter):
if 0 < max_iterations <= i:
break
start_of_iter_time = time()
dataloader_duration_s = start_of_iter_time - end_of_iter_time
examples_per_second = loader.batch_size / dataloader_duration_s
print(
"batch[{}/{}] {:.2f} examples/s".format(
i + 1, total_iterations, examples_per_second
)
)
end_of_iter_time = start_of_iter_time
if profile:
prof = pprofile.Profile()
with prof():
run_dataloader()
if profile_callgrind is not None:
with open(str(profile_callgrind), "w", encoding="utf8") as f:
prof.callgrind(f)
print("Wrote callgrind profile log to {}".format(profile_callgrind))
else:
prof.print_stats()
else:
run_dataloader()
a, b = 0,1
while True:
yield a
a, b = b, a + b
def benchmark():
start = time()
stop = start + 1
for index, _ in enumerate(F()):
now = time()
if now > stop:
break
return index / (now - start)
raw = benchmark()
with pprofile.Profile():
single = benchmark()
with pprofile.ThreadProfile():
threaded = benchmark()
with pprofile.StatisticThread():
statistic = benchmark()
for caption, value in (
('single', single),
('threaded', threaded),
('statistic', statistic),
):
print('%s speed: %.2f%%' % (caption, value * 100 / raw))
- any path argument given to unrestrictedTraverse
(unrestrictedTraverse_pathlist.txt)
- all involved python code, including Python Scripts without hierarchy
(the rest)
"""
out = BytesIO()
with zipfile.ZipFile(
out,
mode='w',
compression=zipfile.ZIP_DEFLATED,
) as outfile:
for path, data, _ in self._iterOutFiles():
outfile.writestr(path, data)
return out.getvalue(), 'application/zip'
class ZopeProfiler(ZopeMixIn, pprofile.Profile):
__slots__ = ZopeMixIn.virtual__slots__
class ZopeStatisticalProfile(ZopeMixIn, pprofile.StatisticalProfile):
__slots__ = ZopeMixIn.virtual__slots__
class ZopeStatisticalThread(pprofile.StatisticalThread):
__allow_access_to_unprotected_subobjects__ = 1
# Intercept "verbose" parameter to prevent writing to stdout.
def getProfiler(verbose=False, **kw):
"""
Get a Zope-friendly pprofile.Profile instance.
"""
return ZopeProfiler(**kw)
def getStatisticalProfilerAndThread(**kw):
self._getFileTiming(caller_frame).call(
caller_code, caller_frame.f_lineno,
file_timing,
callee_code, call_duration - frame_discount,
frame,
)
return self._local_trace
# profile/cProfile-like API
def run(self, cmd):
"""Similar to profile.Profile.run ."""
import __main__
dikt = __main__.__dict__
return self.runctx(cmd, dikt, dikt)
class ThreadProfile(Profile):
"""
threading.Thread-aware version of Profile class.
Threads started after enable() call will be profiled.
After disable() call, threads will need to be switched into and trigger a
trace event (typically a "line" event) before they can notice the
disabling.
"""
__slots__ = ('_local_trace_backup', )
stack = LocalDescriptor(_initStack)
global_dict = LocalDescriptor(dict)
def __init__(self, **kw):
super(ThreadProfile, self).__init__(**kw)
self._local_trace_backup = self._local_trace
def _run(threads, verbose, func_name, filename, *args, **kw):
if threads:
klass = ThreadProfile
else:
klass = Profile
prof = klass(verbose=verbose)
try:
try:
getattr(prof, func_name)(*args, **kw)
except SystemExit:
pass
finally:
if filename is None:
prof.print_stats()
else:
prof.dump_stats(filename)
def _enable(self):
self._local_trace = self._local_trace_backup
threading.settrace(self._global_trace)
super(ThreadProfile, self)._enable()