Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if hasattr(source, 'read'):
return ('stream', None)
# Format: inline
if not isinstance(source, six.string_types):
return (None, 'inline')
# Format: gsheet
if 'docs.google.com/spreadsheets' in source:
if 'export' not in source and 'pub' not in source:
return (None, 'gsheet')
elif 'csv' in source:
return ('https', 'csv')
# Format: sql
for sql_scheme in config.SQL_SCHEMES:
if source.startswith('%s://' % sql_scheme):
return (None, 'sql')
# General
parsed = urlparse(source)
scheme = parsed.scheme.lower()
if len(scheme) < 2:
scheme = config.DEFAULT_SCHEME
format = os.path.splitext(parsed.path or parsed.netloc)[1][1:].lower() or None
if format is None:
# Test if query string contains a "format=" parameter.
query_string = parse_qs(parsed.query)
query_string_format = query_string.get("format")
if query_string_format is not None and len(query_string_format) == 1:
format = query_string_format[0]
def __init__(self,
bytes_sample_size=config.DEFAULT_BYTES_SAMPLE_SIZE,
http_session=None,
http_stream=True,
http_timeout=None):
# Create default session
if not http_session:
http_session = requests.Session()
http_session.headers.update(config.HTTP_HEADERS)
# No stream support
if six.PY2:
http_stream = False
# Set attributes
self.__bytes_sample_size = bytes_sample_size
self.__http_session = http_session
self.__http_stream = http_stream
self.__http_timeout = http_timeout
self.__stats = None
def __init__(self,
bytes_sample_size=config.DEFAULT_BYTES_SAMPLE_SIZE,
s3_endpoint_url=None):
self.__bytes_sample_size = bytes_sample_size
self.__s3_endpoint_url = (
s3_endpoint_url or
os.environ.get('S3_ENDPOINT_URL') or
config.S3_DEFAULT_ENDPOINT_URL)
self.__s3_client = boto3.client('s3', endpoint_url=self.__s3_endpoint_url)
self.__stats = None
@click.version_option(config.VERSION, message='%(version)s')
def cli(source, limit, **options):
"""Command-line interface
```
Usage: tabulator [OPTIONS] SOURCE
Options:
--headers INTEGER
--scheme TEXT
--format TEXT
--encoding TEXT
--limit INTEGER
--version Show the version and exit.
--help Show this message and exit.
```
def detect_encoding(sample, encoding=None):
"""Detect encoding of a byte string sample.
"""
# To reduce tabulator import time
from cchardet import detect
if encoding is not None:
return normalize_encoding(sample, encoding)
result = detect(sample)
confidence = result['confidence'] or 0
encoding = result['encoding'] or 'ascii'
encoding = normalize_encoding(sample, encoding)
if confidence < config.ENCODING_CONFIDENCE:
encoding = config.DEFAULT_ENCODING
if encoding == 'ascii':
encoding = config.DEFAULT_ENCODING
return encoding
def __prepare_dialect(self, stream):
# Get sample
sample = []
while True:
try:
sample.append(next(stream))
except StopIteration:
break
if len(sample) >= config.CSV_SAMPLE_LINES:
break
# Get dialect
try:
separator = b'' if six.PY2 else ''
delimiter = self.__options.get('delimiter', ',\t;|')
dialect = csv.Sniffer().sniff(separator.join(sample), delimiter)
if not dialect.escapechar:
dialect.doublequote = True
except csv.Error:
class dialect(csv.excel):
pass
for key, value in self.__options.items():
setattr(dialect, key, value)
# https://github.com/frictionlessdata/FrictionlessDarwinCore/issues/1
if getattr(dialect, 'quotechar', None) == '':
scheme = self.__scheme or detected_scheme
format = self.__format or detected_format
# Get compression
for type in config.SUPPORTED_COMPRESSION:
if self.__compression == type or detected_format == type:
compression = type
else:
scheme = self.__scheme
format = self.__format
# Initiate loader
self.__loader = None
if scheme is not None:
loader_class = self.__custom_loaders.get(scheme)
if loader_class is None:
if scheme not in config.LOADERS:
message = 'Scheme "%s" is not supported' % scheme
raise exceptions.SchemeError(message)
loader_path = config.LOADERS[scheme]
if loader_path:
loader_class = helpers.import_attribute(loader_path)
if loader_class is not None:
loader_options = helpers.extract_options(options, loader_class.options)
if compression and 'http_stream' in loader_class.options:
loader_options['http_stream'] = False
self.__loader = loader_class(
bytes_sample_size=self.__bytes_sample_size,
**loader_options)
# Zip compression
if compression == 'zip' and six.PY3:
source = self.__loader.load(source, mode='b')
SchemeError: The file scheme is not supported.
FormatError: The file format is not supported.
# Returns
bool: Whether tabulator is able to load the source file.
"""
# Get scheme and format
detected_scheme, detected_format = helpers.detect_scheme_and_format(source)
scheme = scheme or detected_scheme
format = format or detected_format
# Validate scheme and format
if scheme is not None:
if scheme not in config.LOADERS:
raise exceptions.SchemeError('Scheme "%s" is not supported' % scheme)
if format not in config.PARSERS:
raise exceptions.FormatError('Format "%s" is not supported' % format)
return True
def __init__(self, bytes_sample_size=config.DEFAULT_BYTES_SAMPLE_SIZE):
self.__bytes_sample_size = bytes_sample_size
self.__stats = None
def opener():
_params = dict(headers=1)
format = __resource.get("format")
if format == "txt":
# datapackage-pipelines processing requires having a header row
# for txt format we add a single "data" column
_params["headers"] = ["data"]
_params["custom_parsers"] = {"txt": TXTParser}
_params["allow_html"] = True
else:
if format is None:
_, format = tabulator.helpers.detect_scheme_and_format(__url)
if format in tabulator.config.SUPPORTED_COMPRESSION:
format = None
else:
try:
parser_cls = tabulator.helpers.import_attribute(tabulator.config.PARSERS[format])
except KeyError:
logging.error("Unknown format %r", format)
raise
_params.update(
dict(x for x in __resource.items()
if x[0] in parser_cls.options))
_params.update(
dict(x for x in __resource.items()
if x[0] in {'headers', 'scheme', 'encoding', 'sample_size', 'allow_html',
'force_strings', 'force_parse', 'skip_rows', 'compression',
'http_timeout'}))
if isinstance(_params.get('skip_rows'), int): # Backwards compatibility