Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
self._path = path
self._file_type_access = _FileTypeAccessForPath(path.primitive)
@property
def path(self) -> DescribedPath:
"""Path of the file to match. May or may not exist."""
return self._path
@property
def file_type_access(self) -> FileTypeAccess:
return self._file_type_access
class _FileTypeAccessForPath(FileTypeAccess):
_IS_FILE_TYPE_FUNCTIONS = {
FileType.REGULAR: pathlib.Path.is_file,
FileType.DIRECTORY: pathlib.Path.is_dir,
FileType.SYMLINK: pathlib.Path.is_symlink,
}
def __init__(self, path: pathlib.Path):
self._path = path
def is_type(self, expected: FileType) -> bool:
return self._IS_FILE_TYPE_FUNCTIONS[expected](self._path)
def stat(self, follow_sym_links=True) -> os.stat_result:
return (
self._path.stat()
if follow_sym_links
else
self._path.lstat()
def _patch_ssl_transport(monkeypatch):
# Make ssl transport substitution to prevent ssl handshake.
def _make_ssl_transport_dummy(self, rawsock, protocol, sslcontext,
waiter=None, **kwargs):
return self._make_socket_transport(rawsock, protocol, waiter,
extra=kwargs.get('extra'),
server=kwargs.get('server'))
monkeypatch.setattr(
"asyncio.selector_events.BaseSelectorEventLoop._make_ssl_transport",
_make_ssl_transport_dummy)
original_is_file = pathlib.Path.is_file
def mock_is_file(self):
# make real netrc file invisible in home dir
if self.name in ['_netrc', '.netrc'] and self.parent == self.home():
return False
else:
return original_is_file(self)
async def test_proxy_from_env_http(proxy_test_server,
get_request, mocker) -> None:
url = 'http://aiohttp.io/path'
proxy = await proxy_test_server()
mocker.patch.dict(os.environ, {'http_proxy': str(proxy.url)})
mocker.patch('pathlib.Path.is_file', mock_is_file)
def postFromLog(fileName):
"""Analyze a log file and return a list of dictionaries containing
submissions
"""
if Path.is_file(Path(fileName)):
content = JsonFile(fileName).read()
else:
print("File not found")
sys.exit()
try:
del content["HEADER"]
except KeyError:
pass
posts = []
for post in content:
if not content[post][-1]['TYPE'] == None:
posts.append(content[post][-1])
async def get_path(self, path):
"""
Resolves a string path from a GET request to a path on the file system
Does safety checks to make sure they're not getting something forbidden
:param path: String path to reolve
:return: Path object resolved, or an int representing status code
"""
# any hardcoded redirects here
if path == "/":
path = "/index"
path = self.app["settings"]["base_path"] / path.lstrip("/")
# Now do logic to find the desired file. If found, return that path. If not, return an error code
if pathlib.Path.is_file(path):
return path
elif pathlib.Path.is_dir(path):
# Look for an index.html, if that's missing, look for [dirname].html, if that's missing 404
if (path / 'index.html').is_file():
return path / 'index.html'
elif (path / (str(path.name) + ".html")).is_file():
return path / (str(path.name) + ".html")
else:
return 404
elif path.with_suffix(".html").is_file():
return path.with_suffix(".html")
else:
return 404
def __init__(self, rec_dir: str):
self.__files = sorted(filter(Path.is_file, Path(rec_dir).iterdir()))
def is_set_file(path: str) -> bool:
"""
Function returns if the specific output file
already exists (useful for determining if a
foreign lang can be built or not)
:param path:
:return:
"""
joined = SET_OUT_DIR / '{}.json'.format(path)
return pathlib.Path.is_file(joined)
def load(self, **kwargs) -> None:
"""Load classifier parameters"""
log.info(f"Loading model from {self.load_path}")
for path in self.load_path:
if Path.is_file(path):
self.ec_data += load_pickle(path)
else:
raise FileNotFoundError
log.info(f"Loaded items {len(self.ec_data)}")