Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from matplotlib import pyplot as plt
from typing import List
from typing import Tuple
from typing import Union
from typing import Dict
import h5py
import tables
from storm_analysis.sa_library import datareader
import merlin
from merlin.core import analysistask
from merlin.data import dataorganization
from merlin.data import codebook
TaskOrName = Union[analysistask.AnalysisTask, str]
class DataSet(object):
def __init__(self, dataDirectoryName: str,
dataHome: str=None, analysisHome: str=None):
"""Create a dataset for the specified raw data.
Args:
dataDirectoryName: the relative directory to the raw data
dataHome: the base path to the data. The data is expected
to be in dataHome/dataDirectoryName. If dataHome
is not specified, DATA_HOME is read from the
.env file.
analysisHome: the base path for storing analysis results. Analysis
results for this DataSet will be stored in
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)
class CombineCleanedBoundaries(analysistask.AnalysisTask):
"""
A task to construct a network graph where each cell is a node, and overlaps
are represented by edges. This graph is then refined to assign cells to the
fov they are closest to (in terms of centroid). This graph is then refined
to eliminate overlapping cells to leave a single cell occupying a given
position.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
self.cleaningTask = self.dataSet.load_analysis_task(
self.parameters['cleaning_task'])
def get_estimated_memory(self):
# TODO - refine estimate
import numpy as np
import cv2
from typing import Tuple
from merlin.core import analysistask
ExtentTuple = Tuple[float, float, float, float]
class GenerateMosaic(analysistask.AnalysisTask):
"""
An analysis task that generates mosaic images by compiling different
field of views.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
if 'microns_per_pixel' not in self.parameters:
self.parameters['microns_per_pixel'] = 3
if 'fov_crop_width' not in self.parameters:
self.parameters['fov_crop_width'] = 0
if 'separate_files' not in self.parameters:
self.parameters['separate_files'] = False
if 'draw_fov_labels' not in self.parameters:
def get_analysis_subdirectory(
self, analysisTask: TaskOrName, subdirectory: str = None,
create: bool = True) -> str:
"""
analysisTask can either be the class or a string containing the
class name.
create - Flag indicating if the analysis subdirectory should be
created if it does not already exist.
"""
if isinstance(analysisTask, analysistask.AnalysisTask):
analysisName = analysisTask.get_analysis_name()
else:
analysisName = analysisTask
if subdirectory is None:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName])
else:
subdirectoryPath = os.sep.join(
[self.analysisPath, analysisName, subdirectory])
if create:
os.makedirs(subdirectoryPath, exist_ok=True)
return subdirectoryPath
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
self.coreCount = multiprocessing.cpu_count()
def set_core_count(self, coreCount):
"""Set the number of parallel processes this analysis task is
allowed to use.
"""
self.coreCount = coreCount
def is_parallel(self):
return True
class ParallelAnalysisTask(AnalysisTask):
# TODO - this can be restructured so that AnalysisTask is instead a subclass
# of ParallelAnalysisTask where fragment count is set to 1. This could
# help remove some of the redundant code
"""
An abstract class for analysis that can be run in multiple parts
independently. Subclasses should implement the analysis to perform in
the run_analysis() function
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
def run(self, fragmentIndex: int=None, overwrite=True) -> None:
"""Run the specified index of this analysis task.
return not self.dataSet.is_analysis_idle(self)
def get_analysis_name(self):
"""Get the name for this AnalysisTask.
Returns:
the name of this AnalysisTask
"""
return self.analysisName
def is_parallel(self):
"""Determine if this analysis task uses multiple cores."""
return False
class InternallyParallelAnalysisTask(AnalysisTask):
"""
An abstract class for analysis that can only be run in one part,
but can internally be sped up using multiple processes. Subclasses
should implement the analysis to perform in the run_analysis() function.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
self.coreCount = multiprocessing.cpu_count()
def set_core_count(self, coreCount):
"""Set the number of parallel processes this analysis task is
allowed to use.
"""
self.coreCount = coreCount
def _run_analysis(self, fragmentIndex):
decodeTask = self.dataSet.load_analysis_task(
self.parameters['decode_task'])
areaThreshold = self.parameters['area_threshold']
intensityThreshold = self.parameters['intensity_threshold']
distanceThreshold = self.parameters['distance_threshold']
barcodeDB = self.get_barcode_database()
currentBC = decodeTask.get_barcode_database() \
.get_filtered_barcodes(areaThreshold, intensityThreshold,
distanceThreshold=distanceThreshold,
fov=fragmentIndex)
currentBC = self.remove_z_duplicate_barcodes(currentBC)
barcodeDB.write_barcodes(currentBC, fov=fragmentIndex)
class GenerateAdaptiveThreshold(analysistask.AnalysisTask):
"""
An analysis task that generates a three-dimension mean intenisty,
area, minimum distance histogram for barcodes as they are decoded.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
if 'tolerance' not in self.parameters:
self.parameters['tolerance'] = 0.001
# ensure decode_task is specified
decodeTask = self.parameters['decode_task']
def fragment_count(self):
return len(self.dataSet.get_fovs())
import numpy as np
import pandas as pd
from merlin.core import dataset
from merlin.util import spatialfeature
from merlin.core import analysistask
class CombineOutputs(analysistask.AnalysisTask):
# TODO would this be easier if volume normalize, calculate counts, and
# log_x_plus_1 were parameters specific to each task? could set this up
# in the parameters up front with task: {name: x, param1: ...}
"""
An analysis task to combine the outputs of various export tasks into
a single file, using the output of the segment export task to align all
outputs in final file
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
# ensure segment_export_task is specified
segmentExportTask = self.parameters['segment_export_task']
sc.pp.regress_out(aData, self.parameters['regression_keywords'])
sc.pp.scale(aData, max_value=4)
if self.parameters['use_PCs']:
aData = self._select_significant_PCs(aData)
aData = self._compute_neighbors(aData, kValue)
clusterMin = self.parameters['cluster_min_size']
clusteringAlgorithm = self.parameters['clustering_algorithm']
self._cluster(aData, resolution, clusterMin=clusterMin,
clusteringAlgorithm=clusteringAlgorithm, i=i)
class ClusterStabilityAnalysis(analysistask.AnalysisTask):
"""
A metaanalysis task that determines the stability of clusters based on
the proportion of cells originally assigned to a given cluster that
remain clustered when a random subest of the data is reclustered
"""
def __init__(self, metaDataSet, parameters=None, analysisName=None):
super().__init__(metaDataSet, parameters, analysisName)
self.metaDataSet = metaDataSet
def get_estimated_memory(self):
return 10000
def get_estimated_time(self):
return 100
from abc import abstractmethod
import numpy as np
from typing import Tuple
from typing import List
from shapely import geometry
from merlin.core import analysistask
class GlobalAlignment(analysistask.AnalysisTask):
"""
An abstract analysis task that determines the relative position of
different field of views relative to each other in order to construct
a global alignment.
"""
def __init__(self, dataSet, parameters=None, analysisName=None):
super().__init__(dataSet, parameters, analysisName)
@abstractmethod
def fov_coordinates_to_global(
self, fov: int, fovCoordinates: Tuple[float, float]) \
-> Tuple[float, float]:
"""Calculates the global coordinates based on the local coordinates
in the specified field of view.