Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
let spammer = function(obj) {
let {type, exchange, binding, message, uri} = obj;
amqp.connect(uri, (err, conn) => {
conn.createChannel((err, ch) => {
ch.assertExchange(exchange, type, {durable: false});
ch.publish(exchange, binding, new Buffer.from(message));
console.log('.-. message sent', message)
});
// setTimeout(function() { conn.close(); process.exit(0) }, 500)
setTimeout(function() { conn.close() }, 1000)
})
}
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
getConnection() {
const url = this._config.host;
const { hostname } = this._config;
let connection = this.connections[url];
// cache handling, if connection already opened, return it
if (connection && connection.conn) {
return Promise.resolve(connection.conn);
}
// prepare the connection internal object, and reset channel if connection has been closed
this.connections[url] = {
conn: null,
channel: null
};
connection = this.connections[url];
connection.conn = amqp.connect(url, {
clientProperties: {
hostname,
bunnymq: packageVersion,
startedAt: this.startedAt,
connectedAt: new Date().toISOString()
}
}).then((conn) => {
// on connection close, delete connection
conn.on('close', () => {
delete connection.conn;
});
conn.on('error', this._config.transport.error);
connection.conn = conn;
return conn;
}).catch((e) => {
connection.conn = null;
self._connection_callbacks.push(callback)
if (!first) {
return
}
let connection_options = {}
if (self._service_name) {
connection_options.clientProperties = {
connection_name: self._service_name
}
}
// So let's connect!
amqplib.connect(self._url, connection_options, (err, connection) => {
if (err) {
throw err
}
// Everything's go fine, so we'll set this global
// object to our new connection.
self._connection = connection
// Time to run the callbacks. Let's run them and
// take them out of the queue.
// Loop through and make everything happen!
while (self._connection_callbacks.length > 0) {
self._connection_callbacks[0]()
self._connection_callbacks.shift()
}
})
const isExpectedError = err => {
// IllegalOperationError happens when we are draining a broken channel; ignore
if (err instanceof amqplib.IllegalOperationError) {
return true;
}
// similarly, an error with this text is sent in some failure modes. See
// https://github.com/streadway/amqp/issues/409 for a request for a better
// way to recognize this
if (err.message.match(/no reply will be forthcoming/)) {
return true;
}
};
return this.withConnection(async conn => {
const method = confirmChannel ? 'createConfirmChannel' : 'createChannel';
const channel = await conn.amqp[method]();
// any errors on this channel will be handled as exceptions thrown within `fn`,
// so the events can be ignored
channel.on('error', () => {});
try {
return await fn(channel);
} finally {
try {
await channel.close();
} catch (err) {
if (!(err instanceof amqplib.IllegalOperationError)) {
// IllegalOperationError happens when we are closing a broken
// channel; any other error trying to close the channel suggests
// the connection is dead, so mark it failed.
conn.failed();
}
}
}
});
}
const amqp = require('amqplib');
const fs = require('fs');
const AmqpConnection = require('amqplib/lib/callback_model').CallbackModel;
const monad = require('./iomonad');
const log = require('../log')('rabbot.connection');
const info = require('../info');
const url = require('url');
const crypto = require('crypto');
const os = require('os');
/* log
* `rabbot.amqp-connection`
* `debug`
* when amqplib's `connection.close` promise is rejected
* `info`
* connection attempt
* connection success
* connection failure
* no reachable endpoints
const AmqpChannel = require('amqplib/lib/callback_model').Channel;
const monad = require('./iomonad.js');
const log = require('../log')('rabbot.channel');
/* log
* `rabbot.channel`
* `debug`
* when amqplib's `channel.close` promise is rejected
*/
function close (name, channel) {
if (channel.close) {
return channel.close()
.then(null, function (err) {
// since calling close on channel could reject the promise
// (see connection close's comment) this catches and logs it
// for debug level
private async createConnection(): Promise {
if (MQProvider.enabled) {
const { url } = MQConfigObject.load();
logger.log(`connecting to ${url}`);
const connection = await amqp.connect(url).catch(error => logger.error(`connect to mq error: ${r(error)}`));
if (connection == null) {
if (this._retryLimit < 1) {
process.exit(1);
}
setTimeout(
() =>
this.createConnection().catch(() => {
this._retryLimit -= 1;
logger.error(`reconnect(${10 - this._retryLimit}) to mq error, retry in 10s.`);
}),
10000,
);
return Promise.reject();
}
initialize() {
return amqp
.connect('amqp://localhost')
.then(conn => conn.createChannel())
.then(channel => {
this.channel = channel;
return channel.assertQueue('', {exclusive: true});
})
.then(q => {
this.replyQueue = q.queue;
return this._listenForResponses();
})
.catch(function(err) {
console.log(err);
})
;
}