Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _initialize_node_agents(self):
"""Initialize internal dicts containing node information."""
nodes = _get_dict_from_list('nodes', self.cx)
invalid_genes = []
for node in nodes:
id = node['@id']
cx_db_refs = self.get_aliases(node)
node_name = node['n']
up_id = cx_db_refs.get('UP')
if up_id:
db_refs = {'UP': up_id, 'TEXT': node_name}
hgnc_id = uniprot_client.get_hgnc_id(up_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
gene_name = hgnc_client.get_hgnc_name(hgnc_id)
else:
gene_name = uniprot_client.get_gene_name(up_id)
agent = Agent(gene_name, db_refs=db_refs)
self._node_names[id] = gene_name
self._node_agents[id] = agent
continue
else:
self._node_names[id] = node_name
hgnc_id = hgnc_client.get_hgnc_id(node_name)
db_refs = {'TEXT': node_name}
if not hgnc_id:
if not self.require_grounding:
self._node_agents[id] = \
Agent(node_name, db_refs=db_refs)
invalid_genes.append(node_name)
else:
def _get_agent_grounding(agent):
"""Convert an agent to the corresponding PyBEL DSL object (to be filled
with variants later)."""
def _get_id(_agent, key):
_id = _agent.db_refs.get(key)
if isinstance(_id, list):
_id = _id[0]
return _id
hgnc_id = _get_id(agent, 'HGNC')
if hgnc_id:
hgnc_name = hgnc_client.get_hgnc_name(hgnc_id)
if not hgnc_name:
logger.warning('Agent %s with HGNC ID %s has no HGNC name.',
agent, hgnc_id)
return
return protein('HGNC', name=hgnc_name, identifier=hgnc_id)
uniprot_id = _get_id(agent, 'UP')
if uniprot_id:
return protein('UP', name=uniprot_id, identifier=uniprot_id)
fplx_id = _get_id(agent, 'FPLX')
if fplx_id:
return protein('FPLX', name=fplx_id, identifier=fplx_id)
pfam_id = _get_id(agent, 'PF')
if pfam_id:
# if we can't get a mnemonic, we assume it's not a UP ID
if uniprot_client.get_mnemonic(name, web_fallback=False):
up_id = name
# We next check if it's a mnemonic
else:
up_id_from_mnem = uniprot_client.get_id_from_mnemonic(name)
if up_id_from_mnem:
up_id = up_id_from_mnem
if not up_id:
logger.info('Couldn\'t get UP ID from %s' % name)
return name, None
db_refs = {'UP': up_id}
hgnc_id = uniprot_client.get_hgnc_id(up_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
else:
name = uniprot_client.get_gene_name(up_id)
elif ns == 'FPLX':
db_refs = {'FPLX': name}
elif ns in ('GO', 'GOBP', 'GOCC'):
go_id = go_client.get_go_id_from_label(name)
if not go_id:
logger.info('Could not find GO ID for %s' % name)
return name, None
db_refs = {'GO': go_id}
name = go_client.get_go_label(go_id)
elif ns in ('MESHPP', 'MESHD', 'MESH'):
mesh_id, mesh_name = mesh_client.get_mesh_id_name(name)
if not mesh_id:
logger.info('Could not find MESH ID from %s' % name)
return name, None
def _get_agent_grounding(agent):
"""Convert an agent to the corresponding PyBEL DSL object (to be filled with variants later)."""
def _get_id(_agent, key):
_id = _agent.db_refs.get(key)
if isinstance(_id, list):
_id = _id[0]
return _id
hgnc_id = _get_id(agent, 'HGNC')
if hgnc_id:
hgnc_name = hgnc_client.get_hgnc_name(hgnc_id)
if not hgnc_name:
logger.warning('Agent %s with HGNC ID %s has no HGNC name.',
agent, hgnc_id)
return
return protein('HGNC', name=hgnc_name, identifier=hgnc_id)
uniprot_id = _get_id(agent, 'UP')
if uniprot_id:
return protein('UP', name=uniprot_id, identifier=uniprot_id)
fplx_id = _get_id(agent, 'FPLX')
if fplx_id:
return protein('FPLX', name=fplx_id, identifier=fplx_id)
pfam_id = _get_id(agent, 'PF')
if pfam_id:
# Extract key information from the lines.
prot_name = line['Protein Name']
prot_id = line['Protein HMS LINCS ID']
# Get available db-refs.
db_refs = {}
if prot_id:
db_refs.update(self._lc.get_protein_refs(prot_id))
# Since the resource only gives us an UP ID (not HGNC), we
# try to get that and standardize the name to the gene name
up_id = db_refs.get('UP')
if up_id:
hgnc_id = uniprot_client.get_hgnc_id(up_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
prot_name = hgnc_client.get_hgnc_name(hgnc_id)
else:
gene_name = uniprot_client.get_gene_name(up_id)
if gene_name:
prot_name = gene_name
# In some cases lines are missing protein information in which
# case we return None
else:
return None
# Create the agent.
return Agent(prot_name, db_refs=db_refs)
password=ndex_cred['password'],
require_grounding=False)
# Add grounding entries for ungrounded nodes/noncanonical gene names
gnd_map_ext = {'PALB2_wt_eto': {'HGNC': 'PALB2'},
'PALB2_wt': {'HGNC': 'PALB2'},
'CSDA': {'HGNC': 'YBX3'},
'COBRA1': {'HGNC': 'NELFB'},
'SRPR': {'HGNC': 'SRPRA'},
'TOMM70A': {'HGNC': 'TOMM70'}
}
gm.default_grounding_map.update(gnd_map_ext)
gmapper = gm.GroundingMapper(gm.default_grounding_map)
ncp_stmts = gmapper.map_agents(ncp.statements)
gene_names = [hgnc_client.get_hgnc_name(ag.db_refs['HGNC'])
for stmt in ncp_stmts for ag in stmt.agent_list()]
"""
# Get PMIDs for reading
entrez_pmids = get_pmids(gene_names)
network_pmids = ncp.get_pmids()
pmids = list(set(entrez_pmids + network_pmids))
save_pmids_for_reading(pmids, 'pmids.txt')
"""
# Build the model
prior_stmts = build_prior(gene_names, 'palb2_prior')
reach_stmts = ac.load_statements('reach_stmts.pkl')
stmts = ncp_stmts + reach_stmts + prior_stmts
stmts = run_assembly(stmts, 'unfiltered_assembled_stmts.pkl')
agents = self._get_complex_agents(id)
# Return the first agent with the remaining agents as a bound
# condition
agent = agents[0]
agent.bound_conditions = \
[BoundCondition(a, True) for a in agents[1:]]
return agent
else:
gnd_type = _type_db_map[(ent_type, database)]
if gnd_type == 'UP':
up_id = id
db_refs = {'UP': up_id}
hgnc_id = uniprot_client.get_hgnc_id(up_id)
if hgnc_id:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
else:
name = uniprot_client.get_gene_name(up_id)
# Map SIGNOR protein families to FamPlex families
elif ent_type == 'proteinfamily':
db_refs = {database: id} # Keep the SIGNOR family ID in db_refs
key = (database, id)
# Use SIGNOR name unless we have a mapping in FamPlex
name = ent_name
famplex_id = famplex_map.get(key)
if famplex_id is None:
logger.info('Could not find %s in FamPlex map' %
str(key))
else:
db_refs['FPLX'] = famplex_id
name = famplex_id
# Other possible groundings are PUBCHEM, SIGNOR, etc.
refs['HGNC'] = hgnc_id
elif id_dict['source'] == 'UniProt':
refs['UP'] = id_dict['idString']
hgnc_id = uniprot_client.get_hgnc_id(id_dict['idString'])
if hgnc_id:
# Check to see if we have a conflict with an HGNC id
# found from the Entrez id. If so, overwrite with this
# one, in which we have greater faith.
if 'HGNC' in refs.keys() and refs['HGNC'] != hgnc_id:
msg = ('Inferred HGNC:%s from UP:%s does not'
' match HGNC:%s from EGID:%s') % \
(refs['HGNC'], refs['UP'], hgnc_id,
refs['EGID'])
logger.info(msg)
refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
else:
gene_name = uniprot_client.get_gene_name(id_dict['idString'])
if gene_name is not None:
name = gene_name
elif id_dict['source'] in ('Tax', 'NCBI'):
refs['TAX'] = id_dict['idString']
elif id_dict['source'] == 'CHEBI':
refs['CHEBI'] = 'CHEBI:%s' % id_dict['idString']
# These we take as is
elif id_dict['source'] in ('MESH', 'OMIM', 'CTD'):
refs[id_dict['source']] = id_dict['idString']
# Handle mutations
elif id_dict['source'] == 'Unk' and \
id_dict['entityType'] == 'ProteinMutation':
# {'idString': 'p|SUB|Y|268|A', 'source': 'Unk',
# 'tool': 'PubTator', 'entityType': 'ProteinMutation'}
elif ns == 'SFAM':
db_refs = {'SFAM': name}
indra_name = bel_to_indra.get(name)
if indra_name is None:
logger.info('Could not find mapping for BEL/SFAM family: '
'%s (%s)' % (name, node_data))
else:
db_refs['FPLX'] = indra_name
name = indra_name
# Map Entrez genes to HGNC/UP
elif ns in ('EGID', 'ENTREZ', 'NCBIGENE'):
hgnc_id = hgnc_client.get_hgnc_from_entrez(name)
db_refs = {'EGID': name}
if hgnc_id is not None:
db_refs['HGNC'] = hgnc_id
name = hgnc_client.get_hgnc_name(hgnc_id)
up_id = hgnc_client.get_uniprot_id(hgnc_id)
if up_id:
db_refs['UP'] = up_id
else:
logger.info('HGNC entity %s with HGNC ID %s has no '
'corresponding Uniprot ID.',
name, hgnc_id)
mirbase_id = mirbase_client.get_mirbase_id_from_hgnc_id(hgnc_id)
if mirbase_id:
db_refs['MIRBASE'] = mirbase_id
else:
logger.info('Could not map EGID%s to HGNC.' % name)
name = 'E%s' % name
elif ns == 'MIRBASE':
mirbase_id = mirbase_client.get_mirbase_id_from_mirbase_name(name)
if not mirbase_id: