How to use the luigi.TaskParameter function in luigi

To help you get started, we’ve selected a few luigi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github constantinpape / cluster_tools / cluster_tools / connected_components / merge_assignments.py View on Github external
#

class MergeAssignmentsBase(luigi.Task):
    """ MergeAssignments base class
    """

    task_name = 'merge_assignments'
    src_file = os.path.abspath(__file__)
    allow_retry = False

    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    shape = luigi.ListParameter()
    number_of_labels = luigi.IntParameter()
    # task that is required before running this task
    dependency = luigi.TaskParameter()

    def requires(self):
        return self.dependency

    def run_impl(self):
        shebang, block_shape, roi_begin, roi_end = self.global_config_values()
        self.init(shebang)

        block_list = vu.blocks_in_volume(self.shape, block_shape,
                                         roi_begin, roi_end)
        n_jobs = min(len(block_list), self.max_jobs)

        config = self.get_task_config()
        config.update({'output_path': self.output_path,
                       'output_key': self.output_key,
                       'tmp_folder': self.tmp_folder,
github constantinpape / cluster_tools / cluster_tools / postprocess / background_size_filter.py View on Github external
import cluster_tools.utils.function_utils as fu
from cluster_tools.cluster_tasks import SlurmTask, LocalTask, LSFTask


class BackgroundSizeFilterBase(luigi.Task):
    """ BackgroundSizeFilter base class
    """

    task_name = 'background_size_filter'
    src_file = os.path.abspath(__file__)

    input_path = luigi.Parameter()
    input_key = luigi.Parameter()
    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    dependency = luigi.TaskParameter()

    def requires(self):
        return self.dependency

    def _parse_log(self, log_path):
        log_path = self.input().path
        lines = fu.tail(log_path, 3)
        lines = [' '.join(ll.split()[2:]) for ll in lines]
        # load
        if lines[0].startswith("saving results to"):
            path = lines[0].split()[-1]
            assert os.path.exists(path), path
            return path
        else:
            raise RuntimeError("Could not parse log file.")
github constantinpape / cluster_tools / cluster_tools / node_labels / merge_node_labels.py View on Github external
"""

    task_name = 'merge_node_labels'
    src_file = os.path.abspath(__file__)
    allow_retry = False

    input_path = luigi.Parameter()
    input_key = luigi.Parameter()
    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    max_overlap = luigi.BoolParameter(default=True)
    ignore_label = luigi.IntParameter(default=None)
    serialize_counts = luigi.BoolParameter(default=False)
    prefix = luigi.Parameter(default='')
    #
    dependency = luigi.TaskParameter()

    def requires(self):
        return self.dependency

    def run_impl(self):
        # get the global config and init configs
        shebang = self.global_config_values()[0]
        self.init(shebang)

        # load the task config
        config = self.get_task_config()

        with vu.file_reader(self.input_path, 'r') as f:
            number_of_labels = int(f[self.input_key].attrs['maxId']) + 1

        node_shape = (number_of_labels,)
github constantinpape / cluster_tools / cluster_tools / morphology / region_centers.py View on Github external
"""

    task_name = 'region_centers'
    src_file = os.path.abspath(__file__)
    allow_retry = False

    input_path = luigi.Parameter()
    input_key = luigi.Parameter()
    morphology_path = luigi.Parameter()
    morphology_key = luigi.Parameter()
    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    ignore_label = luigi.Parameter(default=None)
    resolution = luigi.ListParameter(default=[1, 1, 1])
    #
    dependency = luigi.TaskParameter()

    def requires(self):
        return self.dependency

    def run_impl(self):
        # get the global config and init configs
        shebang = self.global_config_values()[0]
        self.init(shebang)

        # load the task config
        config = self.get_task_config()
        number_of_labels = int(vu.file_reader(self.input_path,
                                              'r')[self.input_key].attrs['maxId']) + 1
        # TODO should be a parameter
        id_chunks = 2000
        block_list = vu.blocks_in_volume([number_of_labels], [id_chunks])
github constantinpape / cluster_tools / cluster_tools / features / merge_region_features.py View on Github external
class MergeRegionFeaturesBase(luigi.Task):
    """ Merge edge feature base class
    """

    task_name = 'merge_region_features'
    src_file = os.path.abspath(__file__)
    # retry is too complecated for now ...
    allow_retry = False

    # input and output volumes
    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    number_of_labels = luigi.IntParameter()
    dependency = luigi.TaskParameter()
    prefix = luigi.Parameter(default='')

    def requires(self):
        return self.dependency

    def run_impl(self):
        # get the global config and init configs
        shebang, block_shape, roi_begin, roi_end = self.global_config_values()
        self.init(shebang)

        # load the task config
        config = self.get_task_config()
        chunk_size = min(10000, self.number_of_labels)

        # temporary output dataset
        tmp_path = os.path.join(self.tmp_folder, 'region_features_tmp.n5')
github constantinpape / cluster_tools / deprecated / production / components / node_assignment.py View on Github external
from concurrent import futures

import vigra
import nifty
import z5py
import luigi


# TODO multiple threads for ufd ?!
class NodeAssignmentTask(luigi.Task):
    path = luigi.Parameter()
    out_key = luigi.Parameter()
    config_path = luigi.Parameter()
    max_jobs = luigi.IntParameter()
    tmp_folder = luigi.Parameter()
    dependency = luigi.TaskParameter()
    # FIXME default does not work; this still needs to be specified
    time_estimate = luigi.IntParameter(default=10)
    run_local = luigi.BoolParameter(default=False)

    def requires(self):
        return self.dependency

    def run(self):
        from .. import util

        # copy the script to the temp folder and replace the shebang
        file_dir = os.path.dirname(os.path.abspath(__file__))
        script_path = os.path.join(self.tmp_folder, 'node_assignment.py')
        util.copy_and_replace(os.path.join(file_dir, 'node_assignment.py'), script_path)

        # find the number of blocks
github constantinpape / cluster_tools / cluster_tools / write / write.py View on Github external
task_name = 'write'
    src_file = os.path.abspath(__file__)

    # path and key to input and output datasets
    input_path = luigi.Parameter()
    input_key = luigi.Parameter()
    output_path = luigi.Parameter()
    output_key = luigi.Parameter()
    # path to the node assignments
    # the key is optional, because the assignment can either be a
    # dense assignment table stored as n5 dataset
    # or a sparse table stored as pickled python map
    assignment_path = luigi.Parameter()
    assignment_key = luigi.Parameter(default=None)
    # the task we depend on
    dependency = luigi.TaskParameter(default=DummyTask())
    # we may have different write tasks,
    # so we need an identifier to keep them apart
    identifier = luigi.Parameter()
    offset_path = luigi.Parameter(default='')

    def requires(self):
        return self.dependency

    @staticmethod
    def default_task_config():
        # we use this to get also get the common default config
        config = LocalTask.default_task_config()
        config.update({'chunks': None, 'allow_empty_assignments': False})
        return config

    def clean_up_for_retry(self, block_list, prefix):
github constantinpape / cluster_tools / deprecated / production / components / merge_blocks.py View on Github external
# TODO more clean up (job config files)
# TODO computation with rois
class MergeTask(luigi.Task):
    """
    Run all block-merge tasks
    """

    path = luigi.Parameter()
    out_key = luigi.Parameter()
    config_path = luigi.Parameter()
    # maximal number of jobs that will be run in parallel
    max_jobs = luigi.IntParameter()
    tmp_folder = luigi.Parameter()
    dependency = luigi.TaskParameter()
    # FIXME default does not work; this still needs to be specified
    time_estimate = luigi.IntParameter(default=10)
    run_local = luigi.BoolParameter(default=False)

    def requires(self):
        return self.dependency

    def _prepare_jobs(self, n_jobs, n_blocks, block_shape):
        block_list = list(range(n_blocks))
        for job_id in range(n_jobs):
            block_jobs = block_list[job_id::n_jobs]
            job_config = {'block_shape': block_shape,
                          'block_list': block_jobs}
            config_path = os.path.join(self.tmp_folder, 'merge_blocks_config_job%i.json' % job_id)
            with open(config_path, 'w') as f:
                json.dump(job_config, f)
github constantinpape / cluster_tools / cluster_tools / features / block_edge_features.py View on Github external
class BlockEdgeFeaturesBase(luigi.Task):
    """ Block edge feature base class
    """

    task_name = 'block_edge_features'
    src_file = os.path.abspath(__file__)

    # input and output volumes
    input_path = luigi.Parameter()
    input_key = luigi.Parameter()
    labels_path = luigi.Parameter()
    labels_key = luigi.Parameter()
    graph_path = luigi.Parameter()
    output_path = luigi.Parameter()
    dependency = luigi.TaskParameter()

    def requires(self):
        return self.dependency

    @staticmethod
    def default_task_config():
        # we use this to get also get the common default config
        config = LocalTask.default_task_config()
        config.update({'offsets': None, 'filters': None, 'sigmas': None, 'halo': [0, 0, 0],
                       'apply_in_2d': False, 'channel_agglomeration': 'mean'})
        return config

    def clean_up_for_retry(self, block_list):
        super().clean_up_for_retry(block_list)
        # TODO remove any output of failed blocks because it might be corrupted
github constantinpape / cluster_tools / deprecated / production / blockwise_multicut / graph / initial_subgraph.py View on Github external
import nifty
import nifty.distributed as ndist


class InitialSubgraphTask(luigi.Task):
    """
    Compute initial sub-graphs
    """

    path = luigi.Parameter()
    ws_key = luigi.Parameter()
    out_path = luigi.Parameter()
    max_jobs = luigi.Parameter()
    config_path = luigi.Parameter()
    tmp_folder = luigi.Parameter()
    dependency = luigi.TaskParameter()
    # FIXME default does not work; this still needs to be specified
    time_estimate = luigi.IntParameter(default=10)
    run_local = luigi.BoolParameter(default=False)

    def requires(self):
        return self.dependency

    def _prepare_jobs(self, n_jobs, block_list, block_shape):
        for job_id in range(n_jobs):
            block_jobs = block_list[job_id::n_jobs]
            job_config = {'block_shape': block_shape,
                          'block_list': block_jobs}
            config_path = os.path.join(self.tmp_folder, 'initial_subgraph_config_job%i.json' % job_id)
            with open(config_path, 'w') as f:
                json.dump(job_config, f)