Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# all positional arguments, including the mountpath will be parsed into the tarfilepaths namespace and we have to
# manually separate them depending on the type.
if os.path.isdir( args.mount_source[-1] ) or not os.path.exists( args.mount_source[-1] ):
args.mount_point = args.mount_source[-1]
args.mount_source = args.mount_source[:-1]
if not args.mount_source:
print( "[Error] You must at least specify one path to a valid TAR file or union mount source directory!" )
exit( 1 )
# Manually check that all specified TARs and folders exist
compressions = [ '' ]
if 'IndexedBzip2File' in globals():
compressions += [ 'bz2' ]
if 'IndexedGzipFile' in globals():
compressions += [ 'gz' ]
args.mount_source = [ TarFileType( mode = 'r', compressions = compressions )( tarFile )[0].name
if not os.path.isdir( tarFile ) else os.path.realpath( tarFile )
for tarFile in args.mount_source ]
# Automatically generate a default mount path
if args.mount_point is None:
tarPath = args.mount_source[0]
for doubleExtension in [ '.tar.bz2', '.tar.gz' ]:
if tarPath[-len( doubleExtension ):].lower() == doubleExtension.lower():
args.mount_point = tarPath[:-len( doubleExtension )]
break
if not args.mount_point:
args.mount_point = os.path.splitext( tarPath )[0]
args.mount_point = os.path.abspath( args.mount_point )
return args
args = parseArgs( args )
# Convert the comma separated list of key[=value] options into a dictionary for fusepy
fusekwargs = dict( [ option.split( '=', 1 ) if '=' in option else ( option, True )
for option in args.fuse.split( ',' ) ] ) if args.fuse else {}
if args.prefix:
fusekwargs['modules'] = 'subdir'
fusekwargs['subdir'] = args.prefix
if args.mount_point in args.mount_source:
fusekwargs['nonempty'] = True
global printDebug
printDebug = args.debug
fuseOperationsObject = TarMount(
pathToMount = args.mount_source,
clearIndexCache = args.recreate_index,
recursive = args.recursive,
gzipSeekPointSpacing = args.gzip_seek_point_spacing,
mountPoint = args.mount_point )
fuse.FUSE( operations = fuseOperationsObject,
mountpoint = args.mount_point,
foreground = args.foreground,
nothreads = True, # Can't access SQLite database connection object from multiple threads
**fusekwargs )
except:
pass
self.mountSources = [ self._openTar( tarFile, clearIndexCache, recursive, gzipSeekPointSpacing )
if not os.path.isdir( tarFile ) else os.path.realpath( tarFile )
for tarFile in pathToMount ]
# make the mount point read only and executable if readable, i.e., allow directory listing
tarStats = os.stat( pathToMount[0] )
# clear higher bits like S_IFREG and set the directory bit instead
mountMode = ( tarStats.st_mode & 0o777 ) | stat.S_IFDIR
if mountMode & stat.S_IRUSR != 0: mountMode |= stat.S_IXUSR
if mountMode & stat.S_IRGRP != 0: mountMode |= stat.S_IXGRP
if mountMode & stat.S_IROTH != 0: mountMode |= stat.S_IXOTH
self.rootFileInfo = SQLiteIndexedTar.FileInfo(
offset = None ,
offsetheader = None ,
size = tarStats.st_size ,
mtime = tarStats.st_mtime,
mode = mountMode ,
type = tarfile.DIRTYPE ,
linkname = "" ,
uid = tarStats.st_uid ,
gid = tarStats.st_gid ,
istar = True ,
issparse = False
)
# Create mount point if it does not exist
self.mountPointWasCreated = False
if mountPoint and not os.path.exists( mountPoint ):
def _getFileInfoFromRealFile( filePath ):
stats = os.lstat( filePath )
return SQLiteIndexedTar.FileInfo(
offset = None ,
offsetheader = None ,
size = stats.st_size ,
mtime = stats.st_mtime,
mode = stats.st_mode ,
type = None , # I think this is completely unused and mostly contained in mode
linkname = os.readlink( filePath ) if os.path.islink( filePath ) else None,
uid = stats.st_uid ,
gid = stats.st_gid ,
istar = False ,
issparse = False
)
"""Wrapper for _getUnionMountFileInfo, which also resolves special file version specifications in the path."""
result = self._getUnionMountFileInfo( filePath )
if result:
return result
# If no file was found, check if a special .versions folder to an existing file/folder was queried.
result = self._decodeVersionsPathAPI( filePath )
if not result:
raise fuse.FuseOSError( fuse.errno.ENOENT )
filePath, pathIsVersions, fileVersion = result
# 2.) Check if the request was for the special .versions folder and return its contents or stats
# At this point, filePath is assured to actually exist!
if pathIsVersions:
parentFileInfo, mountSource = self._getUnionMountFileInfo( filePath )
return SQLiteIndexedTar.FileInfo(
offset = None ,
offsetheader = None ,
size = 0 ,
mtime = parentFileInfo.mtime,
mode = 0o777 | stat.S_IFDIR,
type = tarfile.DIRTYPE ,
linkname = "" ,
uid = parentFileInfo.uid ,
gid = parentFileInfo.gid ,
istar = False ,
issparse = False
), mountSource
# 3.) At this point the request is for an actual version of a file or folder
result = self._getUnionMountFileInfo( filePath, fileVersion = fileVersion )
if result:
return None
# fileVersion >= 1
for mountSource in self.mountSources:
if isinstance( mountSource, str ):
realFilePath = os.path.join( mountSource, filePath.lstrip( os.path.sep ) )
if os.path.lexists( realFilePath ):
if fileVersion == 1:
return self._getFileInfoFromRealFile( realFilePath ), \
os.path.join( mountSource, filePath.lstrip( os.path.sep ) )
fileVersion -= 1
else:
fileInfo = mountSource.getFileInfo( filePath, listDir = False, fileVersion = fileVersion )
if isinstance( fileInfo, SQLiteIndexedTar.FileInfo ):
return fileInfo, mountSource
fileVersion -= len( mountSource.getFileInfo( filePath, listVersions = True ) )
if fileVersion < 1:
return None
return None
def _openTar( self, tarFilePath, clearIndexCache, recursive, gzipSeekPointSpacing ):
return SQLiteIndexedTar( tarFilePath,
writeIndex = True,
clearIndexCache = clearIndexCache,
recursive = recursive,
gzipSeekPointSpacing = gzipSeekPointSpacing )
def _openCompressedFile( fileobj, gzipSeekPointSpacing ):
"""Opens a file possibly undoing the compression."""
rawFile = None
tarFile = fileobj
compression = SQLiteIndexedTar._detectCompression( fileobj = tarFile )
if compression == 'bz2':
rawFile = tarFile # save so that garbage collector won't close it!
tarFile = IndexedBzip2File( rawFile.fileno() )
elif compression == 'gz':
rawFile = tarFile # save so that garbage collector won't close it!
# drop_handles keeps a file handle opening as is required to call tell() during decoding
tarFile = IndexedGzipFile( fileobj = rawFile,
drop_handles = False,
spacing = gzipSeekPointSpacing )
return tarFile, rawFile, compression
self.parentFolderCache = []
self.mountRecursively = recursive
self.sqlConnection = None
assert tarFileName or fileObject
if not tarFileName:
self.tarFileName = ''
self.createIndex( fileObject )
# return here because we can't find a save location without any identifying name
return
self.tarFileName = os.path.abspath( tarFileName )
if not fileObject:
fileObject = open( self.tarFileName, 'rb' )
self.tarFileObject, self.rawFileObject, self.compression = \
SQLiteIndexedTar._openCompressedFile( fileObject, gzipSeekPointSpacing )
# will be used for storing indexes if current path is read-only
possibleIndexFilePaths = [
self.tarFileName + ".index.sqlite",
os.path.expanduser( os.path.join( "~", ".ratarmount",
self.tarFileName.replace( "/", "_" ) + ".index.sqlite" ) )
]
self.indexFileName = None
if clearIndexCache:
for indexPath in possibleIndexFilePaths:
if os.path.isfile( indexPath ):
os.remove( indexPath )
# Try to find an already existing index
for indexPath in possibleIndexFilePaths:
# 2. Open TAR file reader
try:
streamed = ( 'IndexedBzip2File' in globals() and isinstance( fileObject, IndexedBzip2File ) ) or \
( 'IndexedGzipFile' in globals() and isinstance( fileObject, IndexedGzipFile ) )
# r: uses seeks to skip to the next file inside the TAR while r| doesn't do any seeks.
# r| might be slower but for compressed files we have to go over all the data once anyways
# and I had problems with seeks at this stage. Maybe they are gone now after the bz2 bugfix though.
loadedTarFile = tarfile.open( fileobj = fileObject, mode = 'r|' if streamed else 'r:', ignore_zeros = True )
except tarfile.ReadError as exception:
print( "Archive can't be opened! This might happen for compressed TAR archives, "
"which currently is not supported." )
raise exception
if progressBar is None:
progressBar = ProgressBar( os.fstat( fileObject.fileno() ).st_size )
# 3. Iterate over files inside TAR and add them to the database
try:
for tarInfo in loadedTarFile:
loadedTarFile.members = []
globalOffset = streamOffset + tarInfo.offset_data
globalOffsetHeader = streamOffset + tarInfo.offset
if 'IndexedBzip2File' in globals() and isinstance( fileObject, IndexedBzip2File ):
# We will have to adjust the global offset to a rough estimate of the real compressed size.
# Note that tell_compressed is always one bzip2 block further, which leads to underestimated
# file compression ratio especially in the beginning.
progressBar.update( int( globalOffset * fileObject.tell_compressed() / 8 / fileObject.tell() ) )
elif 'IndexedGzipFile' in globals() and isinstance( fileObject, IndexedGzipFile ):
try:
progressBar.update( int( globalOffset * fileObject.fileobj().tell() / fileObject.tell() ) )
except: