Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from __future__ import unicode_literals
from io import BytesIO
import six
import unicodecsv
from rows.plugins.utils import (
create_table,
ipartition,
serialize,
)
from rows.utils import Source
sniffer = unicodecsv.Sniffer()
# Some CSV files have more than 128kB of data in a cell, so we force this value
# to be greater (16MB).
# TODO: check if it impacts in memory usage.
# TODO: may add option to change it by passing a parameter to import/export.
unicodecsv.field_size_limit(16777216)
def fix_dialect(dialect):
if not dialect.doublequote and dialect.escapechar is None:
dialect.doublequote = True
if dialect.quoting == unicodecsv.QUOTE_MINIMAL and dialect.quotechar == "'":
# Python csv's Sniffer seems to detect a wrong quotechar when
# quoting is minimal
dialect.quotechar = '"'
def csv_data(csv_path, skip_header=True):
"""Pass in the path to a CSV file, returns a CSV Reader object.
"""
csv_file = open(csv_path, 'r')
# Determine the CSV dialect.
dialect = unicodecsv.Sniffer().sniff(csv_file.read(1024))
csv_file.seek(0)
data = unicodecsv.reader(csv_file, dialect)
if skip_header:
data.next()
return data
def _csv_data_from_file(csv_file, preview_limit=10):
try:
dialect = unicodecsv.Sniffer().sniff(csv_file.read(1024))
csv_file.seek(0)
csv_reader = unicodecsv.reader(csv_file, dialect)
csv_values = itertools.islice(csv_reader, preview_limit)
csv_values = zip(*csv_values)
return {'success': True, 'data': csv_values}
except unicodecsv.Error as exc:
return {'success': False, 'error': exc.message}
except UnicodeDecodeError as exc:
return {'success': False, 'error': exc}
break
detector.close()
encoding = detector.result['encoding']
logger.info("Detected encoding: {} ({} confidence)".format(detector.result['encoding'],
detector.result['confidence']))
if delimiter is None:
try: #Python 3
with open(infile, 'r', errors='ignore') as csvfile:
# dialect = csv.Sniffer().sniff(csvfile.read(1024), delimiters=";,$\t")
dialect = csv.Sniffer().sniff(csvfile.readline()) #read only the header instead of the entire file to determine delimiter
csvfile.seek(0)
except TypeError: #Python 2
with open(infile, 'r') as csvfile:
# dialect = csv.Sniffer().sniff(csvfile.read(1024), delimiters=";,$\t")
dialect = csv.Sniffer().sniff(csvfile.readline()) #read only the header instead of the entire file to determine delimiter
csvfile.seek(0)
logger.info("Detected dialect: {} (delimiter: '{}')".format(dialect, dialect.delimiter))
delimiter = dialect.delimiter
logger.info("Delimiter is: {}".format(delimiter))
if base.endswith('/'):
base = base[:-1]
metadata = {
u"@id": iribaker.to_iri(u"{}/{}".format(base, url)),
u"@context": [u"https://raw.githubusercontent.com/CLARIAH/COW/master/csvw.json",
{u"@language": u"en",
u"@base": u"{}/".format(base)},
get_namespaces(base)],
def clean_csv_file(self):
csv_file = self.cleaned_data['csv_file']
# Universal newlines
# Ugly hack - but works for now
csv_string = '\n'.join(csv_file.read().splitlines())
csv_file = StringIO.StringIO(csv_string)
# TODO: Use chardet
# Ref: https://github.com/dokterbob/django-newsletter/blob/master/newsletter/admin_forms.py#L86
sniffer = csv.Sniffer()
# Python's CSV code eats only UTF-8
csv_file = codecs.EncodedFile(csv_file, self.charset)
try:
if self.dialect:
# Override dialect, don't autodetect
dialect = self.dialect
else:
# Sniff dialect
dialect = sniffer.sniff(
csv_string,
delimiters=self.delimiters
)
# Sniff for a header