Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_conf_abs(self):
with tempfile.NamedTemporaryFile() as cf:
with self.mock_option('conf', cf.name), self.mock_option('debug', False):
cf.write('debug=True\n'.encode('utf-8'))
cf.flush()
command = FlowerCommand()
command.apply_options('flower', argv=['--conf=%s' % cf.name])
self.assertEqual(cf.name, options.conf)
self.assertTrue(options.debug)
def test_empty_conf(self):
with self.mock_option('conf', None):
command = FlowerCommand()
command.apply_options('flower', argv=['--conf=/dev/null'])
self.assertEqual('/dev/null', options.conf)
def test_port(self):
with self.mock_option('port', 5555):
command = FlowerCommand()
command.apply_options('flower', argv=['--port=123'])
self.assertEqual(123, options.port)
def test_conf_relative(self):
with tempfile.NamedTemporaryFile(dir='.') as cf:
with self.mock_option('conf', cf.name), self.mock_option('debug', False):
cf.write('debug=True\n'.encode('utf-8'))
cf.flush()
command = FlowerCommand()
command.apply_options('flower', argv=['--conf=%s' % os.path.basename(cf.name)])
self.assertTrue(options.debug)
def test_default_option(self):
command = FlowerCommand()
command.apply_options('flower', argv=[])
self.assertEqual('flowerconfig.py', options.conf)
def test_address(self):
with self.mock_option('address', '127.0.0.1'):
command = FlowerCommand()
command.apply_options('flower', argv=['--address=foo'])
self.assertEqual('foo', options.address)
def main():
try:
flower = FlowerCommand()
flower.execute_from_commandline()
except Exception:
import sys
print(bugreport(app=flower.app), file=sys.stderr)
raise
def early_version(self, argv):
if '--version' in argv:
print(__version__, file=self.stdout)
super(FlowerCommand, self).early_version(argv)
def flower():
"""Start flower"""
assert pkconfig.channel_in('dev')
run_dir = _run_dir().join('flower').ensure(dir=True)
with pkio.save_chdir(run_dir):
command.FlowerCommand().execute_from_commandline([
'flower',
'--address=' + cfg.ip,
'--app=sirepo.celery_tasks',
'--no-color',
'--persistent',
])