Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import pytest
import numpy as np
import json
import networkx as nx
from shapely import geometry
from merlin.util import spatialfeature
testCoords1 = [(1, 1), (1, 2), (2, 2), (2, 1), (1, 1)]
testCoords2 = [(0, 0), (2, 0), (2, 2), (0, 2), (0, 0)]
feature1 = spatialfeature.SpatialFeature([[geometry.Polygon(testCoords1)]], 0)
feature2 = spatialfeature.SpatialFeature([[geometry.Polygon(testCoords2)]], 0)
feature3 = spatialfeature.SpatialFeature([[geometry.Polygon(testCoords1)],
[geometry.Polygon(testCoords1)]],
0, zCoordinates=np.array([0, 0.5]))
feature4 = spatialfeature.SpatialFeature([[geometry.Polygon(testCoords1)],
[geometry.Polygon(testCoords2)]],
0, zCoordinates=np.array([0, 0.5]))
p1 = spatialfeature.SpatialFeature(
[[geometry.Polygon([(0, 0), (0, 1), (1, 1), (1, 0)])]], 0)
p2 = spatialfeature.SpatialFeature(
[[geometry.Polygon([(0, 0.5), (0, 1), (1, 1), (1, 0.5)])]], 0)
p3 = spatialfeature.SpatialFeature(
[[geometry.Polygon([(0, 0.5), (0, 1.5), (1, 1.5), (1, 0.5)])]], 0)
p4 = spatialfeature.SpatialFeature(
[[geometry.Polygon([(0, 1), (0, 2), (1, 2), (1, 1)])]], 0)
watershedIndex = self.dataSet.get_data_organization() \
.get_data_channel_index(self.parameters['watershed_channel_name'])
watershedImages = self._read_and_filter_image_stack(fragmentIndex,
watershedIndex, 5)
seeds = segmentation.separate_merged_seeds(
segmentation.extract_seeds(seedImages))
normalizedWatershed, watershedMask = segmentation\
.prepare_watershed_images(watershedImages)
seeds[np.invert(watershedMask)] = 0
watershedOutput = skiseg.watershed(
normalizedWatershed, measure.label(seeds), mask=watershedMask,
connectivity=np.ones((3, 3, 3)), watershed_line=True)
zPos = np.array(self.dataSet.get_data_organization().get_z_positions())
featureList = [spatialfeature.SpatialFeature.feature_from_label_matrix(
(watershedOutput == i), fragmentIndex,
globalTask.fov_to_global_transform(fragmentIndex), zPos)
for i in np.unique(watershedOutput) if i != 0]
featureDB = self.get_feature_database()
featureDB.write_features(featureList, fragmentIndex)
def _run_analysis(self, fragmentIndex) -> None:
allFOVs = np.array(self.dataSet.get_fovs())
fovBoxes = self.alignTask.get_fov_boxes()
fovIntersections = sorted([i for i, x in enumerate(fovBoxes) if
fovBoxes[fragmentIndex].intersects(x)])
intersectingFOVs = list(allFOVs[np.array(fovIntersections)])
spatialTree = rtree.index.Index()
count = 0
idToNum = dict()
for currentFOV in intersectingFOVs:
cells = self.segmentTask.get_feature_database()\
.read_features(currentFOV)
cells = spatialfeature.simple_clean_cells(cells)
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)
spatialTree = rtree.index.Index()
count = 0
idToNum = dict()
for currentFOV in intersectingFOVs:
cells = self.segmentTask.get_feature_database()\
.read_features(currentFOV)
cells = spatialfeature.simple_clean_cells(cells)
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)
def _run_analysis(self, fragmentIndex) -> None:
allFOVs = np.array(self.dataSet.get_fovs())
fovBoxes = self.alignTask.get_fov_boxes()
fovIntersections = sorted([i for i, x in enumerate(fovBoxes) if
fovBoxes[fragmentIndex].intersects(x)])
intersectingFOVs = list(allFOVs[np.array(fovIntersections)])
spatialTree = rtree.index.Index()
count = 0
idToNum = dict()
for currentFOV in intersectingFOVs:
cells = self.segmentTask.get_feature_database()\
.read_features(currentFOV)
cells = spatialfeature.simple_clean_cells(cells)
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)
def _run_analysis(self):
allFOVs = self.dataSet.get_fovs()
graph = nx.Graph()
for currentFOV in allFOVs:
subGraph = self.cleaningTask.return_exported_data(currentFOV)
graph = nx.compose(graph, subGraph)
cleanedCells = spatialfeature.remove_overlapping_cells(graph)
self.dataSet.save_dataframe_to_csv(cleanedCells, 'all_cleaned_cells',
analysisTask=self)
def _run_analysis(self):
allFOVs = self.dataSet.get_fovs()
graph = nx.Graph()
for currentFOV in allFOVs:
subGraph = self.cleaningTask.return_exported_data(currentFOV)
graph = nx.compose(graph, subGraph)
cleanedCells = spatialfeature.remove_overlapping_cells(graph)
self.dataSet.save_dataframe_to_csv(cleanedCells, 'all_cleaned_cells',
analysisTask=self)
def _run_analysis(self, fragmentIndex) -> None:
allFOVs = np.array(self.dataSet.get_fovs())
fovBoxes = self.alignTask.get_fov_boxes()
fovIntersections = sorted([i for i, x in enumerate(fovBoxes) if
fovBoxes[fragmentIndex].intersects(x)])
intersectingFOVs = list(allFOVs[np.array(fovIntersections)])
spatialTree = rtree.index.Index()
count = 0
idToNum = dict()
for currentFOV in intersectingFOVs:
cells = self.segmentTask.get_feature_database()\
.read_features(currentFOV)
cells = spatialfeature.simple_clean_cells(cells)
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)
spatialTree = rtree.index.Index()
count = 0
idToNum = dict()
for currentFOV in intersectingFOVs:
cells = self.segmentTask.get_feature_database()\
.read_features(currentFOV)
cells = spatialfeature.simple_clean_cells(cells)
spatialTree, count, idToNum = spatialfeature.construct_tree(
cells, spatialTree, count, idToNum)
graph = nx.Graph()
cells = self.segmentTask.get_feature_database()\
.read_features(fragmentIndex)
cells = spatialfeature.simple_clean_cells(cells)
graph = spatialfeature.construct_graph(graph, cells,
spatialTree, fragmentIndex,
allFOVs, fovBoxes)
self.dataSet.save_graph_as_gpickle(
graph, 'cleaned_cells', self, fragmentIndex)