Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fetch(self):
"""Lazily trigger download of the data when requested."""
if self._file_path is not None:
return self._file_path
temp_path = self.context.work_path
if self._content_hash is not None:
self._file_path = storage.load_file(self._content_hash,
temp_path=temp_path)
return self._file_path
if self.response is not None:
self._file_path = random_filename(temp_path)
content_hash = sha1()
with open(self._file_path, 'wb') as fh:
for chunk in self.response.iter_content(chunk_size=8192):
content_hash.update(chunk)
fh.write(chunk)
self._remove_file = True
chash = content_hash.hexdigest()
self._content_hash = storage.archive_file(self._file_path,
content_hash=chash)
if self.http.cache and self.ok:
self.context.set_tag(self.request_id, self.serialize())
self.retrieved_at = datetime.utcnow().isoformat()
def load_file(self, content_hash, file_name=None, read_mode='rb'):
file_path = storage.load_file(content_hash,
file_name=file_name,
temp_path=self.work_path)
if file_path is None:
yield None
else:
try:
with open(file_path, mode=read_mode) as fh:
yield fh
finally:
storage.cleanup_file(content_hash,
temp_path=self.work_path)