Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
module.exports = (pushItemToQueue, emailConcurrency, inputFile) => {
const parser = csv.parse();
const transformerOptions = {
parallel: emailConcurrency, // Needs to obey concurrency rules based on SMTP limitations
consume:true
};
const transformer = csv.transform((row, callback) => {
// Async flow with SMTP relay server obeys concurrency rules with this stream
pushItemToQueue(callback(null, row));
}, transformerOptions);
transformer.on('error', (err) => {
// Catch & throw errs
throw err;
});
// Create read stream, parse then pipe to transformer to perform async operations. Finally, release data for garbage collection.
fs.createReadStream(`${__dirname}${inputFile}`)
.pipe(parser)
.pipe(transformer)
.on('data', function() {
// Do nothing with the data. Allow chunk to evaporate in write stream to prevent buffer overflow.
// check if there is any error
let isError = false;
data.forEach((module) => {
if (module.isError) {
isError = true;
console.error(`module: '${module.name}@${module.version}'. ${module.errorMessage}`, module);
}
});
if (isError) {
process.exit(-1);
}
csv.stringify(data, {
header: true,
columns: {
'_': '', // placeholder
name: 'name',
groupId: 'groupId', // not a field we have
artifactId: 'artifactId', // not a field we have
version: 'version',
repository: 'url',
licenses: 'license',
'in distribution': 'in distribution', // not a field we have
checked: 'checked', // not a field we have
text: 'text', // not a field we have
copyright: 'notice',
comment: 'comment' // placeholder
}
}, (error, cvsText) => {
return _cleanUpCSVFile(userCSV, () => {
callback(validator.getFirstError());
});
}
// Create a new context object on the request tenant
const adminCtx = new Context(tenant, ctx.user());
// Will contain an entry for each user in the CSV file
const data = [];
// The CSV module works with streams, so get a readable stream to the uploaded CSV file
const input = fs.createReadStream(userCSV.path);
// Pipe the stream to a CSV parser and keep track of the user records
const parser = csv.parse({ trim: true });
input.pipe(parser);
parser.on('readable', () => {
let user = parser.read();
while (user) {
data.push(user);
user = parser.read();
}
});
parser
.on('finish', () => {
// If the CSV parse was successful, we call the callback to prevent the request from timing out
// whilst the users are being loaded
PrincipalsEmitter.emit('preCSVUserImport');
callback();
function arrayToCsv(csvArray)
{
csv.stringify(csvArray, (err, output) =>
{
writeFile(outputFileName, output, "utf8").then(() =>
{
// eslint-disable-next-line no-console
console.log(`${outputFileName} is created`);
}).catch((error) =>
{
console.error(error);
});
});
}
// Take the csv and convert to json and tidy it up so that it is consistent.
var path = require('path');
var _ = require('underscore');
var csv = require('csv');
var canonicalJSON = require('canonical-json');
var fs = require('fs');
var output = [];
// read in the CSV
var csvFile = path.join( __dirname, 'currencies.csv' );
var input = fs.createReadStream(csvFile);
var parser = csv.parse({"columns": true});
parser.on('readable', function () {
var record = null;
while(record = parser.read()){
// convert decimals to and number
record.decimals = parseInt(record.decimals);
output.push(record);
}
});
parser.on('finish', function(){
// sort by code
output = _.sortBy(output, function (i) { return i.code;} );
// print out results to stdout
async function createRowStream(source, encoding, parserOptions) {
const parser = csv.parse({ltrim: true, relax_column_count: true, ...parserOptions})
let stream
// Stream factory
if (isFunction(source)) {
stream = source()
// Node stream
} else if (source.readable) {
stream = source
// Inline source
} else if (isArray(source)) {
stream = new Readable({objectMode: true})
for (const row of source) stream.push(row)
stream.push(null)
return conn.tx(t => new Promise(resolve => {
let rejected = false;
const error = null;
const MIN_INSERT_BUFFER_SIZE = 1000;
let modelsToInsert = [];
const pendingInserts = [];
const parser = csv.parse();
function insertQueuedModels() {
const insert = bulkInsertModels(modelsToInsert, t);
pendingInserts.push(insert);
modelsToInsert = [];
}
// Defines how the parser behaves when it has new data (models to be inserted)
parser.on('readable', () => {
let row;
// We can only get the next row once so we check that it isn't null at the same time that we assign it
while ((row = parser.read()) !== null) { // tslint:disable-line no-conditional-assignment
if (!rejected) {
modelsToInsert.push(mapRowToModel(row));
}
}
res.on('end', () => {
// parse csv
CSV.parse(csv, (err, result) => {
val = result;
callback(val, meter_id);
});
});
});
const tsvFile = fs.createWriteStream(tsvPath)
const columns = [
'Label',
'Name',
'Network',
'Program',
'Air Date',
'Air Time',
'Duration',
'Archive ID',
'URL',
'Model ID',
]
// Set up the CSV Pipeline
const csvStringifier = csv.stringify({
header: true,
columns,
})
csvStringifier.on('readable', () => {
let data = null
// eslint-disable-next-line no-cond-assign
while (data = csvStringifier.read()) {
csvFile.write(data)
}
})
// Set up the TSV Pipeline
const tsvStringifier = csv.stringify({
header: true,
columns,
delimiter: '\t',
var TimingTransformer = function() {
var parseOptions = {
delimiter: ' ', // use one space to delimit columns
auto_parse: true, // convert read data types to native types
columns: ['start_sample', 'end_sample', 'phoneme', 'word', 'word_boundary'],
};
var parser = parse(parseOptions);
var transformer = transform(gen_transform_function());
var combinedStream = combine(parser, transformer);
return combinedStream;
};