import logging
from inflection import underscore
from collections import OrderedDict as _o
import gilda
from indra.sources import trips
from indra.ontology.standardize import \
standardize_agent_name
from indra.statements.statements import *
from indra.assemblers.english.assembler import _assemble_agent_str, \
EnglishAssembler, statement_base_verb, statement_present_verb
from indra.explanation.model_checker.pysb import _add_activity_to_agent, \
_add_modification_to_agent
from bioagents.tra.tra import MolecularQuantity, TemporalPattern, TimeInterval
from .util import get_class_from_name
logger = logging.getLogger(__name__)
[docs]class Query(object):
"""The parent class of all query types."""
@classmethod
def _from_json(cls, json_dict):
query_type = json_dict.get('type')
query_cls = get_class_from_name(query_type, Query)
query = query_cls._from_json(json_dict)
return query
def matches(self, other):
return self.matches_key() == other.matches_key()
def matches_key(self):
pass
def get_hash(self):
return make_hash(self.matches_key(), 14)
def get_hash_with_model(self, model_name):
key = (self.matches_key(), model_name)
return make_hash(mk_str(key), 14)
def get_type(self):
return underscore(type(self).__name__)
[docs]class StructuralProperty(Query):
pass
[docs]class PathProperty(Query):
"""This type of query requires finding a mechanistic causally consistent
path that satisfies query statement.
Parameters
----------
path_stmt : indra.statements.Statement
A path to look for in the model represented as INDRA statement.
entity_constraints : dict(list(indra.statements.Agent))
A dictionary containing lists of Agents to be included in or excluded
from the path.
relationship_constraints : dict(list(str))
A dictionary containing lists of Statement types to include in or
exclude from the path.
"""
def __init__(self, path_stmt, entity_constraints=None,
relationship_constraints=None):
self.path_stmt = path_stmt
if entity_constraints:
self.include_entities = entity_constraints.get('include', [])
self.exclude_entities = entity_constraints.get('exclude', [])
else:
self.include_entities = []
self.exclude_entities = []
if relationship_constraints:
self.include_rels = relationship_constraints.get('include', [])
self.exclude_rels = relationship_constraints.get('exclude', [])
else:
self.include_rels = []
self.exclude_rels = []
self.entities = self.get_entities()
def to_json(self):
query_type = self.get_type()
json_dict = _o(type=query_type)
json_dict['path'] = self.path_stmt.to_json()
json_dict['entity_constraints'] = {}
if self.include_entities:
json_dict['entity_constraints']['include'] = [
ec.to_json() for ec in self.include_entities]
if self.exclude_entities:
json_dict['entity_constraints']['exclude'] = [
ec.to_json() for ec in self.exclude_entities]
json_dict['relationship_constraints'] = {}
if self.include_rels:
json_dict['relationship_constraints']['include'] = [
{'type': rel} for rel in self.include_rels]
if self.exclude_rels:
json_dict['relationship_constraints']['exclude'] = [
{'type': rel} for rel in self.exclude_rels]
return json_dict
@classmethod
def _from_json(cls, json_dict):
path_stmt_json = json_dict.get('path')
path_stmt = Statement._from_json(path_stmt_json)
ent_constr_json = json_dict.get('entity_constraints')
entity_constraints = None
if ent_constr_json:
entity_constraints = {}
for key, value in ent_constr_json.items():
entity_constraints[key] = [Agent._from_json(ec) for ec
in value]
rel_constr_json = json_dict.get('relationship_constraints')
relationship_constraints = None
if rel_constr_json:
relationship_constraints = {}
for key, value in rel_constr_json.items():
relationship_constraints[key] = [
rel_type['type'] for rel_type in value]
query = cls(path_stmt, entity_constraints, relationship_constraints)
return query
[docs] def get_entities(self):
"""Return entities from the path statement and the inclusion list."""
path_entities = self.path_stmt.agent_list()
return path_entities + self.include_entities
def matches_key(self):
key = self.path_stmt.matches_key()
if self.include_entities:
for ent in sorted(self.include_entities,
key=lambda x: x.matches_key()):
key += ent.matches_key()
if self.exclude_entities:
for ent in sorted(self.exclude_entities,
key=lambda x: x.matches_key()):
key += ent.matches_key()
if self.include_rels:
for rel in sorted(self.include_rels):
key += rel
if self.exclude_rels:
for rel in sorted(self.exclude_rels):
key += rel
return mk_str(key)
def __str__(self):
parts = [f'PathPropertyQuery(stmt={str(self.path_stmt)}.']
if self.include_entities:
inents = ', '.join([str(e) for e in self.include_entities])
parts.append(f' Include entities: {inents}.')
if self.exclude_entities:
exents = ', '.join([str(e) for e in self.exclude_entities])
parts.append(f' Exclude entities: {exents}.')
if self.include_rels:
inrels = ', '.join(self.include_rels)
parts.append(f' Include relations: {inrels}.')
if self.exclude_rels:
exrels = ', '.join(self.exclude_rels)
parts.append(f' Exclude relations: {exrels}.')
return ''.join(parts)
def __repr__(self):
return str(self)
def to_english(self):
ea = EnglishAssembler([self.path_stmt])
return ea.make_model()
[docs]class SimpleInterventionProperty(Query):
"""This type of query requires dynamic simulation of the model to observe
the behavior under perturbation.
"""
def __init__(self, condition_entity, target_entity, direction):
self.condition_entity = condition_entity
self.target_entity = target_entity
self.direction = direction
@classmethod
def from_stmt(cls, stmt):
if not isinstance(stmt, (Modification, RegulateAmount,
RegulateActivity, Influence)):
logger.info('Statement type %s not handled' %
stmt.__class__.__name__)
return
# Get the polarity for the statement
if isinstance(stmt, Modification):
dir = 'dn' if isinstance(stmt, RemoveModification) else 'up'
elif isinstance(stmt, RegulateActivity):
dir = 'up' if stmt.is_activation else 'dn'
elif isinstance(stmt, RegulateAmount):
dir = 'dn' if isinstance(stmt, DecreaseAmount) else 'up'
elif isinstance(stmt, Influence):
dir = 'dn' if stmt.overall_polarity() == -1 else 'up'
# Get condition and target agents
# Modification
if isinstance(stmt, Modification):
# TODO use Modification's _get_mod_condition when
# _add_modification_to_agent is refactored in INDRA
condition_entity = stmt.enz
# Add the mod for the agent
mod_condition_name = modclass_to_modtype[stmt.__class__]
if isinstance(stmt, RemoveModification):
mod_condition_name = modtype_to_inverse[
mod_condition_name]
# Add modification to substrate agent
target_entity = _add_modification_to_agent(
stmt.sub, mod_condition_name, stmt.residue, stmt.position)
# Activation/Inhibition
elif isinstance(stmt, RegulateActivity):
condition_entity = stmt.subj
# Add activity to object agent
target_entity = _add_activity_to_agent(
stmt.obj, stmt.obj_activity, stmt.is_activation)
# Increase/Decrease amount
elif isinstance(stmt, (RegulateAmount, Influence)):
condition_entity, target_entity = stmt.agent_list()
query = cls(condition_entity, target_entity, dir)
return query
def matches_key(self):
condition_key = self.condition_entity.matches_key()
target_key = self.target_entity.matches_key()
key = (condition_key, self.direction, target_key)
return str(key)
def to_json(self):
query_type = self.get_type()
json_dict = _o(type=query_type)
json_dict['condition_entity'] = self.condition_entity.to_json()
json_dict['target_entity'] = self.target_entity.to_json()
json_dict['direction'] = self.direction
return json_dict
@classmethod
def _from_json(cls, json_dict):
cond_ent_json = json_dict.get('condition_entity')
condition_entity = Agent._from_json(cond_ent_json)
target_ent_json = json_dict.get('target_entity')
target_entity = Agent._from_json(target_ent_json)
direction = json_dict.get('direction')
query = cls(condition_entity, target_entity, direction)
return query
def __str__(self):
descr = (f'SimpleInterventionPropertyQuery'
f'(condition={self.condition_entity}, '
f'target={self.target_entity}, '
f'direction={self.direction})')
return descr
def __repr__(self):
return str(self)
def to_english(self):
cond = _assemble_agent_str(self.condition_entity).agent_str
target = _assemble_agent_str(self.target_entity).agent_str
if self.direction == 'up':
dir_verb = 'increases'
else:
dir_verb = 'decreases'
return f'{cond} {dir_verb} {target}.'
[docs]class ComparativeInterventionProperty(Query):
pass
[docs]class DynamicProperty(Query):
"""This type of query requires dynamic simulation of the model to check
whether the queried temporal pattern is satisfied.
Parameters
----------
entity : indra.statements.Agent
An entity to simulate the model for.
pattern_type : str
Type of temporal pattern. Accepted values: 'always_value', 'no_change',
'eventual_value', 'sometime_value', 'sustained', 'transient'.
quant_value : str or float
Value of molecular quantity of entity of interest. Can be 'high' or
'low' or a specific number.
quant_type : str
Type of molecular quantity of entity of interest. Default: qualitative.
"""
def __init__(self, entity, pattern_type, quant_value=None,
quant_type='qualitative'):
self.entity = entity
self.pattern_type = pattern_type
self.quant_value = quant_value
self.quant_type = quant_type
[docs] def get_temporal_pattern(self, time_limit=None):
"""Return TemporalPattern object created with query properties."""
mq = None
if self.quant_value:
mq = MolecularQuantity(self.quant_type, self.quant_value)
t = None
if time_limit:
t = TimeInterval(0, time_limit, 'second')
tp = TemporalPattern(self.pattern_type, [self.entity], t, value=mq)
return tp
def matches_key(self):
ent_matches_key = self.entity.matches_key()
key = (ent_matches_key, self.pattern_type, self.quant_type,
str(self.quant_value))
return str(key)
def to_json(self):
query_type = self.get_type()
json_dict = _o(type=query_type)
json_dict['entity'] = self.entity.to_json()
json_dict['pattern_type'] = self.pattern_type
json_dict['quantity'] = {}
json_dict['quantity']['type'] = self.quant_type
json_dict['quantity']['value'] = self.quant_value
return json_dict
@classmethod
def _from_json(cls, json_dict):
ent_json = json_dict.get('entity')
entity = Agent._from_json(ent_json)
pattern_type = json_dict.get('pattern_type')
quant_json = json_dict.get('quantity')
quant_type = quant_json.get('type')
quant_value = quant_json.get('value')
query = cls(entity, pattern_type, quant_value, quant_type)
return query
def __str__(self):
descr = (f'DynamicPropertyQuery(entity={self.entity}, '
f'pattern={self.pattern_type}, '
f'molecular quantity={(self.quant_type, self.quant_value)})')
return descr
def __repr__(self):
return str(self)
def to_english(self):
agent = _assemble_agent_str(self.entity).agent_str
agent = agent[0].upper() + agent[1:]
if self.pattern_type == 'always_value':
pattern = 'always'
elif self.pattern_type == 'eventual_value':
pattern = 'eventually'
elif self.pattern_type == 'sometime_value':
pattern = 'sometimes'
elif self.pattern_type == 'no_change':
pattern = 'not changing'
else:
pattern = self.pattern_type
if self.quant_value:
return f'{agent} is {pattern} {self.quant_value}.'
return f'{agent} is {pattern}.'
[docs]class OpenSearchQuery(Query):
"""This type of query requires doing an open ended breadth-first search
to find paths satisfying the query.
Parameters
----------
entity : indra.statements.Agent
An entity to simulate the model for.
stmt_type : str
Name of statement type.
entity_role : str
What role entity should play in statement (subject or object).
terminal_ns : list[str]
Force a path to terminate when any of the namespaces in this list
are encountered and only yield paths that terminate at these
namepsaces
Attributes
----------
path_stmt : indra.statements.Statement
An INDRA statement having its subject or object set to None to
represent open search query.
"""
def __init__(self, entity, stmt_type, entity_role, terminal_ns=None):
self.entity = entity
self.stmt_type = stmt_type
self.entity_role = entity_role
self.terminal_ns = [ns.lower() for ns in terminal_ns] if terminal_ns \
else None
self.path_stmt = self.make_stmt()
def make_stmt(self):
stmt_type = self.stmt_type
if self.entity_role == 'subject':
if self.stmt_type == 'IncreaseAmount':
stmt_type = 'Activation'
elif self.stmt_type == 'DecreaseAmount':
stmt_type = 'Inhibition'
stmt_class = get_statement_by_name(stmt_type)
if self.entity_role == 'subject':
subj = self.entity
obj = None
elif self.entity_role == 'object':
subj = None
obj = self.entity
stmt = stmt_class(subj, obj)
return stmt
def get_sign(self, mc_type):
if mc_type == 'unsigned_graph' or self.entity_role == 'object':
sign = 0
elif isinstance(self.path_stmt, RegulateActivity):
sign = 0 if self.path_stmt.is_activation else 1
elif isinstance(self.path_stmt, RegulateAmount):
sign = 1 if isinstance(self.path_stmt, DecreaseAmount) else 0
else:
raise ValueError('Could not determine sign')
return sign
def matches_key(self):
key = self.entity.matches_key()
key += self.stmt_type
key += self.entity_role
if self.terminal_ns:
for ns in self.terminal_ns:
key += ns
return mk_str(key)
def to_json(self):
query_type = self.get_type()
json_dict = _o(type=query_type)
json_dict['entity'] = self.entity.to_json()
json_dict['stmt_type'] = self.stmt_type
json_dict['entity_role'] = self.entity_role
json_dict['terminal_ns'] = self.terminal_ns
return json_dict
@classmethod
def _from_json(cls, json_dict):
ent_json = json_dict.get('entity')
entity = Agent._from_json(ent_json)
stmt_type = json_dict.get('stmt_type')
entity_role = json_dict.get('entity_role')
terminal_ns = json_dict.get('terminal_ns')
query = cls(entity, stmt_type, entity_role, terminal_ns)
return query
def __str__(self):
parts = [f'OpenSearchQuery(stmt={self.path_stmt}.']
if self.terminal_ns:
parts.append(f' Terminal namespace={self.terminal_ns}')
return ''.join(parts)
def __repr__(self):
return str(self)
def to_english(self):
agent = _assemble_agent_str(self.entity).agent_str
if self.entity_role == 'subject':
verb = statement_base_verb(self.stmt_type.lower())
verb = verb[0].lower() + verb[1:]
sentence = f'What does {agent} {verb}?'
elif self.entity_role == 'object':
verb = statement_present_verb(self.stmt_type.lower())
verb = verb[0].lower() + verb[1:]
sentence = f'What {verb} {agent}?'
sentence = sentence[0].upper() + sentence[1:]
if self.terminal_ns:
sentence += f' ({", ".join(self.terminal_ns).upper()})'
return sentence
def get_entities(self):
return [self.entity]
# This is the general method to get a grounding agent from text but it doesn't
# handle agent state which is required for dynamic queries
[docs]def get_agent_from_gilda(ag_name):
"""Return an INDRA Agent object by grounding its entity text with Gilda."""
matches = gilda.ground(ag_name)
if not matches:
raise GroundingError(
f"Could not find grounding for {ag_name} with Gilda.")
agent = Agent(ag_name,
db_refs={'TEXT': ag_name,
matches[0].term.db: matches[0].term.id})
standardize_agent_name(agent, standardize_refs=True)
return agent
# This is the method that dynamical queries use to represent agents with
# state
[docs]def get_agent_from_trips(ag_text, service_host='http://34.230.33.149:8002/cgi/'):
"""Return an INDRA Agent object by grounding its entity text with TRIPS."""
tp = trips.process_text(ag_text, service_host=service_host)
agent_list = tp.get_agents()
if not agent_list:
raise GroundingError(
f"Could not find grounding for {ag_text} with TRIPS.")
return agent_list[0]
[docs]def get_agent_from_text(ag_text):
"""
Return an INDRA Agent object by grounding its entity text with either
Gilda or TRIPS.
"""
try:
agent = get_agent_from_gilda(ag_text)
logger.info('Got agent from Gilda')
except GroundingError:
try:
agent = get_agent_from_trips(ag_text)
logger.info('Got agent from TRIPS')
except GroundingError:
raise GroundingError(f'Could not find grounding for {ag_text}.')
return agent
[docs]class GroundingError(Exception):
pass