Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _load_dag(dag_fin, opt_fields=None, out=None):
"""Run numerous tests for various REPOrts."""
tic = timeit.default_timer()
dag = GODag(os.path.join(REPO, dag_fin), opt_fields)
toc = timeit.default_timer()
msg = "Elapsed HMS for OBO DAG load: {}\n\n".format(str(datetime.timedelta(seconds=(toc-tic))))
if out is not None:
out.write(msg)
else:
sys.stdout.write(msg)
return dag
def test_oboreader_equal_(dag_fin):
"""Test that the contents of the original DAG and the alternate DAG are the same."""
sys.stdout.write("\n\nTEST GODag EQUALITY USING {} ...\n\n".format(dag_fin))
# 1. Read the obo file using the alternate OBOReader.
tic = timeit.default_timer()
dag_alt = GODag(dag_fin, True)
sys.stdout.write("Alternate OBOReader Elapsed HMS: {}\n\n".format(
str(datetime.timedelta(seconds=(timeit.default_timer()-tic)))))
# 2. Read the obo file using the original OBOReader.
tic = timeit.default_timer()
dag_orig = GODag(dag_fin)
sys.stdout.write("Original OBOReader Elapsed HMS: {}\n\n".format(
str(datetime.timedelta(seconds=(timeit.default_timer()-tic)))))
# 3. Test that the contents of each GODag are the same.
if len(dag_orig) != len(dag_alt): raise Exception("LENGTHES NOT THE SAME")
for goid, goorig in dag_orig.items():
goalt = dag_alt[goid]
if goorig.id != goalt.id: raise Exception("id MISMATCH")
if goorig.name != goalt.name: raise Exception("name MISMATCH")
if goorig.namespace != goalt.namespace: raise Exception("namespace MISMATCH")
if goorig.level != goalt.level: raise Exception("level MISMATCH")
if goorig.depth != goalt.depth: raise Exception("depth MISMATCH")
if goorig.is_obsolete != goalt.is_obsolete: raise Exception("is_obsolete MISMATCH")
# 3a. Check that lengths of arrays are equal
if len(goorig.parents) != len(goalt.parents): raise Exception("parents len MISMATCH")
if len(goorig.children) != len(goalt.children): raise Exception("children len MISMATCH")
def load_dag(self, opt_fields=None):
"""Run numerous tests for various self.reports."""
tic = timeit.default_timer()
dag = GODag(self.obo, opt_fields)
toc = timeit.default_timer()
msg = "Elapsed HMS for OBO DAG load: {HMS} OPTIONAL_ATTR({O})\n".format(
HMS=str(datetime.timedelta(seconds=(toc-tic))), O=opt_fields)
sys.stdout.write(msg)
return dag
def test_parents_ancestors():
"""Test getting parents and ancestors"""
# Load a small GO DAG to demonstrate getting parents and ancestors
file_dag = os.path.join(REPO, 'tests/data/i126/viral_gene_silence.obo')
# Load all relationships using optional attribute
godag = GODag(file_dag)
optional_relationships = set() # Don't trace any optional relationships
go2parents_isa = get_go2parents(godag, optional_relationships)
go2children_isa = get_go2children(godag, optional_relationships)
# TODO: Add more tests for only is_a
godag = GODag(file_dag, optional_attrs={'relationship'})
goids = set(o.item_id for o in godag.values())
# Get parents through "is_a" only
optional_relationships = set() # Don't trace any optional relationships
go2parents_isa = get_go2parents(godag, optional_relationships)
go2children_isa = get_go2children(godag, optional_relationships)
# Get parents through "is_a" and all the "regulates" realtionships
optional_relationships = {'regulates', 'negatively_regulates', 'positively_regulates'}
go2parents_reg = get_go2parents(godag, optional_relationships)
go2children_reg = get_go2children(godag, optional_relationships)
# Print parents throush "is_a" relationship
goid = 'GO:0019222' # regulation of metabolic process
assert go2parents_isa[goid] == {'GO:0050789'}
assert go2parents_reg[goid] == {'GO:0050789', 'GO:0008152'}
def test_semantic_similarity():
"""Test initializing TermCounts with annotations made to alternate GO ID"""
godag = GODag(os.path.join(REPO, '../goatools/tests/data/yangRWC/fig2a.obo'))
file_id2gos = os.path.join(REPO, '../goatools/tests/data/yangRWC/fig2a.anno')
name2go = {o.name: o.item_id for o in godag.values()}
assoc = _get_id2gos(file_id2gos, godag, name2go, NAME2NUM)
tcntobj = TermCounts(godag, assoc)
# N_v: Test accuracy of Python equivalent to Java: getNumberOfAnnotations
# Test number of unique genes annotated to a GO Term PLUS genes annotated to a descendant
assert tcntobj.gocnts[name2go['A']] == 100, tcntobj.gocnts
assert tcntobj.gocnts[name2go['B']] == 40, tcntobj.gocnts
assert tcntobj.gocnts[name2go['C']] == 50, tcntobj.gocnts
assert tcntobj.gocnts[name2go['D']] == 10, tcntobj.gocnts
assert tcntobj.gocnts[name2go['E']] == 10, tcntobj.gocnts
assert tcntobj.gocnts[name2go['F']] == 10, tcntobj.gocnts
assert tcntobj.gocnts[name2go['G']] == 30, tcntobj.gocnts
lib.log.info("Compiling all annotations for each genome")
#get orthology into dictionary
orthoDict = {}
if len(args.input) > 1:
with open(orthologs, 'rU') as input:
for line in input:
line = line.replace('\n', '')
col = line.split('\t')
genes = col[-1].split(', ')
for i in genes:
orthoDict[i] = col[0]
#get GO associations into dictionary as well
with lib.suppress_stdout_stderr():
goLookup = obo_parser.GODag(os.path.join(FUNDB, 'go.obo'))
goDict = {}
go_errors = []
with open(os.path.join(go_folder, 'associations.txt'), 'rU') as input:
for line in input:
line = line.replace('\n', '')
col = line.split('\t')
gos = col[1].split(';')
goList = []
for i in gos:
try:
description = i+' '+goLookup[i].name
except KeyError:
go_errors.append(i)
#print '%s not found in go.obo, try to download updated go file' % i
description = i
goList.append(description)
:return:
'''
if type(study) == str and type(pop) == str:
# load the study and pop from the file
study, pop = GO._read_geneset(study, pop, compare=compare)
else:
# convert to the set
study = frozenset(study)
pop = set(pop)
methods = method.split(",")
if obo == 'go-basic.obo':
obo = os.path.dirname(os.path.realpath(__file__)) + "/obo/go.obo"
if not os.path.exists(obo):
print("obo file not found, start to download")
wget.download('http://purl.obolibrary.org/obo/go/go-basic.obo', obo)
obo_dag = GODag(obo)
propagate_counts = not no_propagate_counts
if type(assoc) == dict:
buf = ""
for k, v in assoc.items():
if not v: continue
line = ";".join([str(x) for x in v if x])
buf += "{}\t{}\n".format(k, line)
path = os.path.dirname(os.path.realpath(__file__)) + "/assoc"
with open(path, 'w') as fp:
fp.write(buf)
assoc = read_associations(path)
elif type(assoc) == defaultdict:
pass
else:
# if from a file
assoc = read_associations(assoc)
def main():
if FLAGS.obo_file:
obo_file = FLAGS.obo_file
else: # no GOdag file was specified -> download it.
url = 'http://purl.obolibrary.org/obo/go.obo'
obo_file = wget.download(url)
GOdag = GODag(obo_file, optional_attrs=['relationship'])
with open(FLAGS.config_json) as config_fobj:
config_dict = json.load(config_fobj)
# set the gpu context
if not FLAGS.gpu:
if config_dict["gpu"] == 'True':
config_dict["gpu"] = "False"
opts = helpers.OptionHandler(config_dict)
deeprotein = DeeProtein(opts, inference=True)
with tf.Graph().as_default():
deeprotein.initialize_helpers()
# graph for inference:
if len(data):
filename = data_folder+'/go-basic.obo'
with open(filename, 'w+b') as obofile:
obofile.write(data)
LOGGER.debug('{0} downloaded ({1})'
.format(go_obo_url, sympath(filename)))
else:
LOGGER.warn('{0} download failed, reason unknown.'
.format(go_obo_url))
else:
go_obo = data_folder+'/go-basic.obo'
return obo_parser.GODag(go_obo)