Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
var fs = require("fs");
var csv = require("fast-csv");
var stream1 = fs.createReadStream("files/testCsvFile.csv");
var {DataStream} = require("scramjet");
DataStream
// the following line will convert any stream to scramjet.DataStream
.from(csv.fromStream(stream2, { headers: true }))
// the next lines controls how many simultaneous operations are made
// I assumed 1, but if you're fine with 40 - go for it.
.setOptions({maxParallel: 1})
// the next line will call your async function and wait until it's completed
// and control the back-pressure of the stream
.do(async (data) => {
const query = await queryBuilder({
schema,
routine,
parameters,
request
}); //here we prepare query for calling the SP with parameters from data
winston.info(query + JSON.stringify(data));
async (measurements, {coords, station_id: stationId}) => {
const options = Object.assign(requestOptions, {
url: 'https://app.cpcbccr.com/caaqms/caaqms_viewdata_v2',
body: Buffer.from(`{"site_id":"${stationId}"}`).toString('base64'),
resolveWithFullResponse: true
});
try {
const response = await rp(options);
const {siteInfo, tableData: {bodyContent}} = JSON.parse(response.body);
await (
DataStream
.from(bodyContent)
.each(async p => {
let parameter = p.parameters.toLowerCase().replace('.', '');
parameter = (parameter === 'ozone') ? 'o3' : parameter;
// Make sure we want the pollutant
if (!acceptableParameters.includes(parameter)) {
return;
}
let m = {
averagingPeriod: {unit: 'hours', value: 0.25},
city: siteInfo.city,
location: siteInfo.siteName,
coordinates: coords,
attribution: [{
export async function getCorrectedMeasurementsFromSource (source, env) {
if (source instanceof Error) throw source;
const [ret] = await (
DataStream.from([source])
.use(fetchCorrectedMeasurementsFromSourceStream, {strict: true})
.toArray()
);
return ret;
}
async function getStreamFromAdapter (adapter, source) {
log.info(`Getting stream for "${source.name}" from "${adapter.name}"`);
if (!adapter.fetchStream) {
log.debug(`Getting data for "${source && source.name}" from adapter "${adapter.name}"`);
const data = await (promisify(adapter.fetchData)(source));
const out = DataStream.from(data.measurements);
out.name = data.name;
return out;
}
log.debug(`Fetching stream for "${source && source.name}" from adapter "${adapter.name}"`);
const out = DataStream.from(adapter.fetchStream, source);
out.name = out.name || source.adapter;
return out;
}
async function getStreamFromAdapter (adapter, source) {
log.info(`Getting stream for "${source.name}" from "${adapter.name}"`);
if (!adapter.fetchStream) {
log.debug(`Getting data for "${source && source.name}" from adapter "${adapter.name}"`);
const data = await (promisify(adapter.fetchData)(source));
const out = DataStream.from(data.measurements);
out.name = data.name;
return out;
}
log.debug(`Fetching stream for "${source && source.name}" from adapter "${adapter.name}"`);
const out = DataStream.from(adapter.fetchStream, source);
out.name = out.name || source.adapter;
return out;
}
export async function fetchStream (source) {
const requestOptions = {
method: 'POST',
headers: {
'accept-language': 'en-US,en',
'content-type': 'application/x-www-form-urlencoded',
accept: 'application/json'
},
form: false
};
const options = Object.assign(requestOptions, {
url: source.url,
body: Buffer.from('{"region":"landing_dashboard"}').toString('base64')
});
return DataStream
.from(
request(options)
.pipe(JSONStream.parse('map.station_list.*'))
)
.setOptions({maxParallel: 5})
.into(
(siteStream, site) => {
return siteStream.whenWrote({
station_id: site['station_id'],
station_name: site['station_name'],
coords: {latitude: Number(site['latitude']), longitude: Number(site['longitude'])}
});
},
new DataStream()
)
.into(