Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
delimiter_ = delimiter if not delimiter is None else dialect.delimiter
quotechar_ = quotechar if not quotechar is None else dialect.quotechar
escapechar_ = escapechar if not escapechar is None else dialect.escapechar
parser = Parser(
data,
delimiter=delimiter_,
quotechar=quotechar_,
escapechar=escapechar_,
field_limit=field_size_limit(),
)
try:
for row in parser:
yield row
except ParserError as e:
raise Error(str(e))
def _quote_in_quoted_field(self, u):
if u == self.dialect.quotechar:
self.parse_add_char(u)
self.state = State.IN_QUOTED_FIELD
elif u == self.dialect.delimiter:
self.parse_save_field()
self.state = State.START_FIELD
elif u == "\r" or u == "\n" or u == "\0":
self.parse_save_field()
self.state = State.START_RECORD if u == "\0" else State.EAT_CRNL
elif not self.dialect.strict:
self.parse_add_char(u)
self.state = State.IN_FIELD
else:
raise Error(
"'%c' expected after '%c'"
% (self.dialect.delimiter, self.dialect.quotechar)
)
return 0
if line is None:
if self.field_len != 0 or self.state == State.IN_QUOTED_FIELD:
if self.dialect.strict:
raise Error("unexpected end of data")
self.parse_save_field(trailing=True)
break
return None
if not unicode_check(line):
raise Error(
"iterator should return strings, not %.200s "
"(did you open the file in text mode?)"
% type(line).__name__
)
for u, v in pairwise_none(line):
if u == "\0":
raise Error("line contains NULL byte")
self.parse_process_char(u, v)
self.parse_process_char("\0", None)
if self.state == State.START_RECORD:
break
return self.fields
def parse_add_char(self, u):
if self.field_len >= _FIELD_SIZE_LIMIT:
raise Error(
"field larger than field limit (%d)" % _FIELD_SIZE_LIMIT
)
if self.field is None:
self.field = []
self.field.append(u)
self.field_len += 1
return 0
def _eat_crnl(self, u):
if u == "\r" or u == "\n":
return 0
elif u == "\0":
self.state = State.START_RECORD
else:
raise Error("new-line character seen in unquoted field.")
return 0
def _in_quoted_field(self, u, v):
if u == "\0":
pass
elif u == self.dialect.escapechar:
self.state = State.ESCAPE_IN_QUOTED_FIELD
elif u == self.dialect.quotechar:
if v == self.dialect.quotechar:
self.dialect.doublequote = True
self.state = State.QUOTE_IN_QUOTED_FIELD
elif self.dialect.strict:
raise Error(
"'%c' expected after '%c'"
% (self.dialect.delimiter, self.dialect.quotechar)
)
else:
self.parse_add_char(u)
self.state = State.IN_FIELD
else:
self.parse_add_char(u)
return 0
def parse_iternext(self):
self.reset()
while True:
line = next(self.input_iter, None)
if line is None:
if self.field_len != 0 or self.state == State.IN_QUOTED_FIELD:
if self.dialect.strict:
raise Error("unexpected end of data")
self.parse_save_field(trailing=True)
break
return None
if not unicode_check(line):
raise Error(
"iterator should return strings, not %.200s "
"(did you open the file in text mode?)"
% type(line).__name__
)
for u, v in pairwise_none(line):
if u == "\0":
raise Error("line contains NULL byte")
self.parse_process_char(u, v)
self.parse_process_char("\0", None)
if self.state == State.START_RECORD:
break
return self.fields
def writerows(self, rows):
try:
self._writer.writerows(rows)
except csv.Error as e:
raise Error(str(e))