Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_default(caplog, capsys):
with caplog.at_level(logging.INFO, logger="dvc"):
# simulate interactive terminal
with mock.patch.object(sys.stderr, "isatty", return_value=True):
for _ in Tqdm(range(10)):
pass
out_err = capsys.readouterr()
assert out_err.out == ""
assert "0/10" in out_err.err
def test_large_dir_progress(repo_dir, dvc_repo):
from dvc.utils import LARGE_DIR_SIZE
from dvc.progress import Tqdm
# Create a "large dir"
for i in range(LARGE_DIR_SIZE + 1):
repo_dir.create(os.path.join("gen", "{}.txt".format(i)), str(i))
with patch.object(Tqdm, "update") as update:
assert not update.called
dvc_repo.add("gen")
assert update.called
for target in targets:
try:
new = self.collect(
target, with_deps=with_deps, recursive=recursive
)
stages.update(new)
except (StageFileDoesNotExistError, StageFileBadNameError) as exc:
if not target:
raise
raise CheckoutErrorSuggestGit(target, exc)
total = get_all_files_numbers(stages)
if total == 0:
logger.info("Nothing to do")
failed = []
with Tqdm(
total=total, unit="file", desc="Checkout", disable=total == 0
) as pbar:
for stage in stages:
failed.extend(
stage.checkout(
force=force,
progress_callback=pbar.update_desc,
relink=relink,
)
)
if failed:
raise CheckoutError(failed)
def gdrive_download_file(
self, file_id, to_file, progress_name, no_progress_bar
):
gdrive_file = self.drive.CreateFile({"id": file_id})
bar_format = (
"Donwloading {desc:{ncols_desc}.{ncols_desc}}... "
+ Tqdm.format_sizeof(int(gdrive_file["fileSize"]), "B", 1024)
)
with Tqdm(
bar_format=bar_format, desc=progress_name, disable=no_progress_bar
):
gdrive_file.GetContentFile(to_file)
):
from_infos = list(self.walk_files(from_info))
to_infos = (
to_info / info.relative_to(from_info) for info in from_infos
)
with ThreadPoolExecutor(max_workers=self.JOBS) as executor:
download_files = partial(
self._download_file,
name=name,
no_progress_bar=True,
file_mode=file_mode,
dir_mode=dir_mode,
)
futures = executor.map(download_files, from_infos, to_infos)
with Tqdm(
futures,
total=len(from_infos),
desc="Downloading directory",
unit="Files",
disable=no_progress_bar,
) as futures:
return sum(futures)
def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.blob_service.get_blob_to_path(
from_info.bucket,
from_info.path,
to_file,
progress_callback=pbar.update_to,
)
def download(self, src, dest, no_progress_bar=False, progress_title=None):
with Tqdm(
desc=progress_title or os.path.basename(src),
disable=no_progress_bar,
bytes=True,
) as pbar:
self.sftp.get(src, dest, callback=pbar.update_to)
fname = fspath_py35(fname)
if os.path.exists(fname):
hash_md5 = hashlib.md5()
binary = not istextfile(fname)
size = os.path.getsize(fname)
no_progress_bar = True
if size >= LARGE_FILE_SIZE:
no_progress_bar = False
msg = (
"Computing md5 for a large file '{}'. This is only done once."
)
logger.info(msg.format(relpath(fname)))
name = relpath(fname)
with Tqdm(
desc=name,
disable=no_progress_bar,
total=size,
bytes=True,
leave=False,
) as pbar:
with open(fname, "rb") as fobj:
while True:
data = fobj.read(LOCAL_CHUNK_SIZE)
if not data:
break
if binary:
chunk = data
else:
chunk = dos2unix(data)
def _download(
self, from_info, to_file, name=None, no_progress_bar=False, **_kwargs
):
with Tqdm(desc=name, disable=no_progress_bar, bytes=True) as pbar:
self.oss_service.get_object_to_file(
from_info.path, to_file, progress_callback=pbar.update_to
)
def _get_plans(self, download, remote, status_info, status):
cache = []
path_infos = []
names = []
for md5, info in Tqdm(
status_info.items(), desc="Analysing status", unit="file"
):
if info["status"] == status:
cache.append(self.checksum_to_path_info(md5))
path_infos.append(remote.checksum_to_path_info(md5))
names.append(info["name"])
if download:
to_infos = cache
from_infos = path_infos
else:
to_infos = path_infos
from_infos = cache
return from_infos, to_infos, names