Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"use strict";
const { Kafka } = require("kafkajs");
const PERFORMANCE_TEST = true;
const serviceLogger = () => ({ label, log }) => {
if (!PERFORMANCE_TEST) console.log(label + " namespace:" + log.message, log);
};
// Create the client with the broker list
const kafka = new Kafka({
clientId: "test",
brokers: ["192.168.2.124:9092"],
logLevel: 5,
logCreator: serviceLogger
});
const consumers = [
kafka.consumer({ groupId: "g1" + Date.now() }),
kafka.consumer({ groupId: "g2" + Date.now() }),
kafka.consumer({ groupId: "g3" + Date.now() })
];
const producer = kafka.producer();
let receipts = 0;
let emits = 0;
this.serviceLogger = kafkalogLevel => ({ namespace, level, label, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("KAFKAJS: " + namespace + log.message, log);
case logLevel.WARN:
return this.logger.warn("KAFKAJS: " + namespace + log.message, log);
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
}
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger
})
// Map kafka-node log to service logger
let serviceLogger = (name) => {
return {
debug: this.logger.debug,
info: this.logger.info,
warn: this.logger.warn,
error: this.logger.error
}
}
kafkaLogging.setLoggerProvider(serviceLogger);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
this.defaults = {
connectionTimeout: 1000,
retry: {
initialRetryTime: 100,
retries: 8
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger,
ssl: this.settings.ssl || null, // refer to kafkajs documentation
sasl: this.settings.sasl || null, // refer to kafkajs documentation
connectionTimeout: this.settings.connectionTimeout || this.defaults.connectionTimeout,
retry: this.settings.retry || this.defaults.retry
});
this.defaultTopic = this.settings.topic || "events";
},
case logLevel.INFO:
return this.logger.info("KAFKAJS: " + namespace + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("KAFKAJS: " + namespace + log.message, log);
}
};
this.defaults = {
connectionTimeout: 1000,
retry: {
initialRetryTime: 100,
retries: 8
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: 5, //logLevel.DEBUG,
logCreator: this.serviceLogger,
ssl: this.settings.ssl || null, // refer to kafkajs documentation
sasl: this.settings.sasl || null, // refer to kafkajs documentation
connectionTimeout: this.settings.connectionTimeout || this.defaults.connectionTimeout,
retry: this.settings.retry || this.defaults.retry
});
this.topics = {
events: this.settings.topics ? this.settings.topics.events || "events" : "events"
};
this.publisher = this.settings.publisher || "flow.publisher";
const { Kafka, logLevel } = require('kafkajs') // eslint-disable-line @typescript-eslint/no-var-requires
type JestFn = (_: any) => void
const kafka = new Kafka({
brokers: ['localhost:9092'],
clientId: 'dockest_example',
logLevel: logLevel.NOTHING,
retry: {
initialRetryTime: 2500,
retries: 10,
},
})
const createConsumer = (
mockConsumptionCallback: JestFn,
): { consumer: any; startConsuming: () => Promise; stopConsuming: () => Promise } => {
const consumer = kafka.consumer({ groupId: 'dockest_group_1' })
const startConsuming = async () => {
await consumer.connect()
this.serviceLogger = () => ({ level, log }) => {
switch(level) {
case logLevel.ERROR:
case logLevel.NOTHING:
return this.logger.error("namespace:" + log.message, log);
case logLevel.WARN:
return this.logger.warn("namespace:" + log.message, log);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: logLevel.DEBUG,
logCreator: this.serviceLogger,
connectionTimeout: this.settings.connectionTimeout || 1000
});
this.topics = {
events: this.settings.topics ? this.settings.topics.events || "events" : "events"
};
},
case logLevel.NOTHING:
return this.logger.error("namespace:" + log.message, log);
case logLevel.WARN:
return this.logger.warn("namespace:" + log.message, log);
case logLevel.INFO:
return this.logger.info("namespace:" + log.message, log);
case logLevel.DEBUG:
return this.logger.debug("namespace:" + log.message, log);
}
};
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers,
logLevel: logLevel.DEBUG,
logCreator: this.serviceLogger,
connectionTimeout: this.settings.connectionTimeout || 1000
});
this.topics = {
events: this.settings.topics ? this.settings.topics.events || "events" : "events"
};
},
const { Kafka, logLevel } = require('kafkajs') // eslint-disable-line @typescript-eslint/no-var-requires
type JestFn = (_: any) => void
const kafka = new Kafka({
brokers: ['localhost:9092'],
clientId: 'dockest_example',
logLevel: logLevel.NOTHING,
retry: {
initialRetryTime: 2500,
retries: 10,
},
})
const createConsumer = (
mockConsumptionCallback: JestFn,
): { consumer: any; startConsuming: () => Promise; stopConsuming: () => Promise } => {
const consumer = kafka.consumer({ groupId: 'dockest_group_1' })
const startConsuming = async () => {
await consumer.connect()
await consumer.subscribe({ topic: 'dockesttopic' })
await consumer.run({
eachMessage: async ({
subscriptions = [{
id: "step.one" + timestamp ,
event: "test.emit",
emit: {
topic: "next",
event: "test.emit.received"
}
}];
service = broker.createService(Subscriber, Object.assign({ settings: { brokers: ["192.168.2.124:9092"], subscriptions: subscriptions } }));
await broker.start();
let content = {
meta: opts.meta,
event: "test.emit",
payload: { msg: "say hello to the world" },
};
producers.forEach(producer => { producer.fail = true; });
mock.__emittedEventReset();
try {
await mock.__emit("events", 10, content);
} catch (err) {
expect(err.message).toBe("simulated fail of producer.send");
}
let emittedEvent = mock.__emittedEvent();
expect(emittedEvent).toEqual(null);
});
init() {
const brokers = [...process.env.KAFKA_BROKERS_ENDPOINT.split(',')];
this.kafka = new Kafka({
clientId: process.env.KAFKA_CLIENT_IDENTIFIER,
brokers,
});
this.subscribes.forEach(subscribe => {
if (subscribe.listener && subscribe.name) {
this.addListener(subscribe.listener, subscribe.name);
}
});
}