Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_set_by_task_or_file(self):
with open("settings.json", "w") as f:
json.dump({"my_setting": "my file value"}, f)
b2luigi.set_setting("my_second_setting", "my value")
task = b2luigi.Task()
setattr(task, "my_third_setting", "my task value")
self.assertEqual("my file value", b2luigi.get_setting("my_setting", task=task))
self.assertEqual("my value", b2luigi.get_setting("my_second_setting", task=task))
self.assertEqual("my task value", b2luigi.get_setting("my_third_setting", task=task))
import b2luigi
import random
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
random_number = random.random()
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
if __name__ == "__main__":
b2luigi.set_setting("result_dir", "results")
b2luigi.process([MyNumberTask(some_parameter=i) for i in range(100)],
workers=200)
def test_requires(self):
class TaskA(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
some_other_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("test.txt")
@b2luigi.requires(TaskA, some_parameter=3)
class TaskB(b2luigi.Task):
another_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("out.dat")
task = TaskB(some_other_parameter=1, another_parameter=42)
self.assertEqual(sorted(task.get_param_names()), ["another_parameter", "some_other_parameter"])
self.assertEqual(task.another_parameter, 42)
def test_many_dependencies(self):
class TaskA(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
def output(self):
yield self.add_to_output("file_a")
class TaskB(b2luigi.Task):
def requires(self):
for i in range(100):
yield self.clone(TaskA, some_parameter=i)
task = TaskB()
self.assertEqual(len(task._get_input_targets("file_a")), 100)
self.assertEqual(len(task.get_input_file_names("file_a")), 100)
self.assertEqual(len(task.get_input_file_names().keys()), 1)
def test_with_task(self):
class MyTask(b2luigi.Task):
my_parameter = b2luigi.ListParameter(hashed=True)
def run(self):
with open(self.get_output_file_name("test.txt"), "w") as f:
f.write("test")
def output(self):
yield self.add_to_output("test.txt")
task = MyTask(my_parameter=["Some", "strange", "items", "with", "bad / signs"])
self.assertTrue(task.get_output_file_name("test.txt")\
.endswith("results/my_parameter=hashed_08928069d368e4a0f8ac02a0193e443b/test.txt"))
import b2luigi
import random
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.Parameter()
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
random_number = random.random()
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
class MyAverageTask(b2luigi.Task):
def requires(self):
for i in range(100):
yield self.clone(MyNumberTask, some_parameter=i)
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.Parameter()
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
random_number = random.random()
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
class MyAverageTask(b2luigi.Task):
def requires(self):
for i in range(100):
yield self.clone(MyNumberTask, some_parameter=i)
def output(self):
yield self.add_to_output("average.txt")
def run(self):
# Build the mean
summed_numbers = 0
counter = 0
for input_file in self.get_input_file_names("output_file.txt"):
with open(input_file, "r") as f:
summed_numbers += float(f.read())
counter += 1
import b2luigi
import random
import os
class MyNumberTask(b2luigi.Task):
some_parameter = b2luigi.IntParameter()
htcondor_settings = {
"request_cpus": 1,
"request_memory": "100 MB"
}
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
print("I am now starting a task")
random_number = random.random()
if self.some_parameter == 3:
raise ValueError
def output(self):
yield self.add_to_output("output_file.txt")
def run(self):
print("I am now starting a task")
random_number = random.random()
if self.some_parameter == 3:
raise ValueError
with open(self.get_output_file_name("output_file.txt"), "w") as f:
f.write(f"{random_number}\n")
class MyAverageTask(b2luigi.Task):
htcondor_settings = {
"request_cpus": 1,
"request_memory": "200 MB"
}
def requires(self):
for i in range(10):
yield self.clone(MyNumberTask, some_parameter=i)
def output(self):
yield self.add_to_output("average.txt")
def run(self):
print("I am now starting the average task")
# Build the mean