How to use the clevercsv.wrappers.stream_csv function in clevercsv

To help you get started, we’ve selected a few clevercsv examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github alan-turing-institute / CleverCSV / tests / test_unit / test_wrappers.py View on Github external
def _stream_test(self, table, dialect):
        tmpfname = self._write_tmpfile(table, dialect)
        exp = [list(map(str, r)) for r in table]
        try:
            out = wrappers.stream_csv(tmpfname)
            self.assertTrue(isinstance(out, types.GeneratorType))
            self.assertEqual(exp, list(out))
        finally:
            os.unlink(tmpfname)
github alan-turing-institute / CleverCSV / tests / test_unit / test_wrappers.py View on Github external
def _stream_test(self, table, dialect):
        tmpfname = self._write_tmpfile(table, dialect)
        exp = [list(map(str, r)) for r in table]
        try:
            out = wrappers.stream_csv(tmpfname)
            self.assertTrue(isinstance(out, types.GeneratorType))
            self.assertEqual(exp, list(out))
        finally:
            os.unlink(tmpfname)
github alan-turing-institute / CleverCSV / tests / test_unit / test_wrappers.py View on Github external
def _stream_test_rows(self, rows, expected):
        contents = "\n".join(rows)
        tmpfd, tmpfname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv")
        tmpid = os.fdopen(tmpfd, "w")
        tmpid.write(contents)
        tmpid.close()

        try:
            out = wrappers.stream_csv(tmpfname)
            self.assertTrue(isinstance(out, types.GeneratorType))
            self.assertEqual(expected, list(out))
        finally:
            os.unlink(tmpfname)
github alan-turing-institute / CleverCSV / tests / test_unit / test_wrappers.py View on Github external
def _stream_test_rows(self, rows, expected):
        contents = "\n".join(rows)
        tmpfd, tmpfname = tempfile.mkstemp(prefix="ccsv_", suffix=".csv")
        tmpid = os.fdopen(tmpfd, "w")
        tmpid.write(contents)
        tmpid.close()

        try:
            out = wrappers.stream_csv(tmpfname)
            self.assertTrue(isinstance(out, types.GeneratorType))
            self.assertEqual(expected, list(out))
        finally:
            os.unlink(tmpfname)