Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
module.exports = (pushItemToQueue, emailConcurrency, inputFile) => {
const parser = csv.parse();
const transformerOptions = {
parallel: emailConcurrency, // Needs to obey concurrency rules based on SMTP limitations
consume:true
};
const transformer = csv.transform((row, callback) => {
// Async flow with SMTP relay server obeys concurrency rules with this stream
pushItemToQueue(callback(null, row));
}, transformerOptions);
transformer.on('error', (err) => {
// Catch & throw errs
throw err;
});
// Create read stream, parse then pipe to transformer to perform async operations. Finally, release data for garbage collection.
fs.createReadStream(`${__dirname}${inputFile}`)
.pipe(parser)
.pipe(transformer)
.on('data', function() {
// Do nothing with the data. Allow chunk to evaporate in write stream to prevent buffer overflow.
var TimingTransformer = function() {
var parseOptions = {
delimiter: ' ', // use one space to delimit columns
auto_parse: true, // convert read data types to native types
columns: ['start_sample', 'end_sample', 'phoneme', 'word', 'word_boundary'],
};
var parser = parse(parseOptions);
var transformer = transform(gen_transform_function());
var combinedStream = combine(parser, transformer);
return combinedStream;
};
const source = this._source
// Prepare unique checks
let uniqueFieldsCache = {}
if (cast) {
if (this.schema) {
uniqueFieldsCache = createUniqueFieldsCache(this.schema)
}
}
// Get row stream
const rowStream = await createRowStream(source, this._encoding, this._parserOptions)
// Get table row stream
let rowNumber = 0
const tableRowStream = rowStream.pipe(csv.transform(row => {
rowNumber += 1
// Get headers
if (rowNumber === this._headersRow) {
this._headers = row
return
}
// Check headers
if (cast) {
if (this.schema && this.headers) {
if (!isEqual(this.headers, this.schema.fieldNames)) {
const error = new TableSchemaError(
'The column header names do not match the field names in the schema')
error.rowNumber = rowNumber
error.headerNames = this.headers
data: loaded / parsed
});
}, UPDATE_INTERVAL);
const end = () => {
if (updateInterval) {
clearInterval(updateInterval);
}
self.postMessage(null); /* EOF */
};
queue.drain = end;
fileReaderStream(message.data.file)
.pipe(csv.parse())
.pipe(csv.transform(record => {
if (record[0].length < 11) {
record[0] = pad(11, record[0], '0');
} else if (record[0].length < 15 && record[0].length > 11) {
record[0] = pad(15, record[0], '0');
}
if (!(CPF.isValid(record[0]) || CNPJ.isValid(record[0]))) {
return;
}
const birthday = moment(record[1], ['DD/MM/YYYY',
'YYYY-MM-DD',
'YY-MM-DD',
'DD/MM/YY'
]);
'-passenger.start.postcodes',
'-passenger.end.postcodes',
'-driver.start.literal',
'-driver.end.literal',
'-passenger.start.literal',
'-passenger.end.literal',
'-driver.start.aom',
'-driver.end.aom',
'-passenger.start.aom',
'-passenger.end.aom',
'-aom',
];
journeyService
.findCursor(req.query)
.pipe(csv.transform(transformer))
.pipe(csv.stringify({ header: true }))
.pipe(res);
} else {
// JSON response
const response = await journeyService.find(req.query);
response.data = response.data.map(filterOps);
res.json(response);
}
} catch (e) {
next(e);
}
});
csv.parse(data.toString(), { columns: true }, function(err, data) {
if (err) {
callback.bind(this, err);
}
csv.transform(data, function(row) {
var key, value;
for (key in row) {
value = row[key].replace(/<b>|<\/b>/g, '');
row[key] = entities.decode(value, 2);
}
return row;
}, callback.bind(this));
});
}</b>
#!/usr/bin/env node
const fs = require('fs')
const argv = require('optimist').argv
const csv = require('csv')
fs.createReadStream(argv._[0])
.pipe(csv.parse({columns: true, relax_column_count: true, delimiter: '\t'}))
.pipe(csv.transform((record) => {
let [Fee] = record.Fee.split(' ')
let [GFee] = record['Gas Fee'].split(' ')
GFee = Number(GFee)
Fee = Number(Fee)
if (Number.isNaN(Fee)) {
Fee = 0
}
if (!Number.isNaN(GFee)) {
Fee += GFee
}
let Buy, CurB, Sell, CurS
if (record.Type === 'Buy') {
[Buy, CurB] = record.Amount.split(' ');
[Sell, CurS] = record.Total.split(' ')
} else if (record.Type === 'Sell') {
talkgroup_filters['tag-interop'] = [1952, 2592, 2656, 2672, 9936, 9968, 9984, 10032, 19248, 33616, 33648, 35536, 35568, 37456, 37488, 37648, 37680, 59952, 59968, 59984, 60000];
talkgroup_filters['tag-law-dispatch'] = [ 16624 ];
talkgroup_filters['tag-paratransit'] = [ 35664 ];
talkgroup_filters['tag-parks'] = [ 35248 ];
talkgroup_filters['tag-parking'] = [ 34800,34608 ];
talkgroup_filters['tag-public-works'] = [ 37328,37200,37040 ];
talkgroup_filters['tag-public-health'] = [ 34480,34448,34416,33584 ];
talkgroup_filters['tag-security'] = [37232,35440,35408,35152,34864,34832,34192,34128,33840];
talkgroup_filters['tag-st-e'] = [ 34384,34368,34352,34320,34288 ];
talkgroup_filters['tag-transportation'] = [ 40080,35632,35600,34576,34512 ];
talkgroup_filters['tag-water'] = [ 35088,35056,35024 ];*/
fs.createReadStream('ChanList.csv').pipe(csv.parse({
columns: ['Num', 'Hex', 'Mode', 'Alpha', 'Description', 'Tag', 'Group', 'Priority']
})).pipe(csv.transform(function (row) {
channels[row.Num] = {
alpha: row.Alpha,
desc: row.Description,
tag: row.Tag,
group: row.Group
};
var tg_array = new Array();
tg_array.push(parseInt(row.Num));
talkgroup_filters['tg-' + row.Num] = tg_array;
var tag_key = 'group-' + row.Group.toLowerCase();
if (!(tag_key in talkgroup_filters)) {
talkgroup_filters[tag_key] = new Array();
}
talkgroup_filters[tag_key].push(parseInt(row.Num));
pool.connect((error, client, done) => {
const pgStream = client
.query(copyFrom(query))
.on("end", () => {
done();
return cb();
})
.on("error", error => {
done();
return cb(error);
});
fs.createReadStream(filepath, { encoding: "utf8" })
.pipe(csv.parse())
.pipe(csv.transform(transform))
.pipe(csv.stringify())
.pipe(pgStream);
});
},