Source code for emmaa.answer_queries
import logging
from datetime import datetime
from copy import deepcopy
from emmaa.model_tests import load_model_manager_from_s3
from emmaa.db import get_db
from emmaa.util import make_date_str, find_latest_s3_file, EMMAA_BUCKET_NAME, \
FORMATTED_TYPE_NAMES
logger = logging.getLogger(__name__)
model_manager_cache = {}
[docs]class QueryManager(object):
"""Manager to run queries and interact with the database.
Parameters
----------
db : emmaa.db.EmmaaDatabaseManager
An instance of a database manager to use.
model_managers : list[emmaa.model_tests.ModelManager]
Optional list of ModelManagers to use for running queries. If not
given, the methods will load ModelManager from S3 when needed.
"""
def __init__(self, db=None, model_managers=None):
self.db = db
if db is None:
self.db = get_db('primary')
self.model_managers = model_managers if model_managers else []
[docs] def answer_immediate_query(
self, user_email, user_id, query, model_names, subscribe,
bucket=EMMAA_BUCKET_NAME):
"""This method first tries to find saved result to the query in the
database and if not found, runs ModelManager method to answer query."""
query_type = query.get_type()
# Retrieve query-model hashes
query_hashes = [
query.get_hash_with_model(model) for model in model_names]
# Store query in the database for future reference.
self.db.put_queries(user_email, user_id, query, model_names, subscribe)
# Check if the query has already been answered for any of given models
# and retrieve the results from database.
saved_results = self.db.get_results_from_query(query, model_names)
if not saved_results:
saved_results = []
checked_models = {res[0] for res in saved_results}
# If the query was answered for all models before, return the hashes.
if checked_models == set(model_names):
return {query_type: query_hashes}
# Run queries mechanism for models for which result was not found.
new_results = []
new_date = datetime.now()
for model_name in model_names:
if model_name not in checked_models:
results_to_store = []
mm = self.get_model_manager(model_name)
response_list = mm.answer_query(query, bucket=bucket)
for (mc_type, response, paths) in response_list:
results_to_store.append((query, mc_type, response))
self.db.put_results(model_name, results_to_store)
return {query_type: query_hashes}
[docs] def answer_registered_queries(self, model_name, bucket=EMMAA_BUCKET_NAME):
"""Retrieve and asnwer registered queries
Retrieve queries registered on database for a given model,
answer them, calculate delta between results and put results to a
database.
Parameters
----------
model_name : str
The name of the model
bucket : str
The bucket to save the results to
"""
model_manager = self.get_model_manager(model_name)
queries = self.db.get_queries(model_name)
logger.info(f'Found {len(queries)} queries for {model_name} model.')
# Only do the following steps if there are queries for this model
if queries:
results = model_manager.answer_queries(queries, bucket=bucket)
new_results = [(model_name, result[0], result[1], result[2], '')
for result in results]
self.db.put_results(model_name, results)
[docs] def get_registered_queries(self, user_email, query_type='path_property'):
"""Get formatted results to queries registered by user."""
results = self.db.get_results(user_email, query_type=query_type)
return format_results(results, query_type)
[docs] def retrieve_results_from_hashes(
self, query_hashes, query_type='path_property', latest_order=1):
"""Retrieve results from a db given a list of query-model hashes."""
results = self.db.get_results_from_hashes(
query_hashes, latest_order=latest_order)
return format_results(results, query_type)
def get_model_manager(self, model_name):
# Try get model manager from class attributes or load from s3.
for mm in self.model_managers:
if mm.model.name == model_name:
return mm
return load_model_manager_from_cache(model_name)
[docs]def format_results(results, query_type='path_property'):
"""Format db output to a standard json structure."""
model_types = ['pysb', 'pybel', 'signed_graph', 'unsigned_graph']
formatted_results = {}
for result in results:
model = result[0]
query = result[1]
query_hash = query.get_hash_with_model(model)
if query_hash not in formatted_results:
formatted_results[query_hash] = {
'query': query.to_english(),
'model': model,
'date': make_date_str(result[5])}
mc_type = result[2]
response_json = result[3]
delta = result[4]
response = []
for k, v in response_json.items():
if isinstance(v, str):
response = v
elif isinstance(v, dict):
if k in delta:
new_v = deepcopy(v)
new_v['path'] = ('new', new_v['path'])
response.append(new_v)
else:
response.append(v)
if query_type in ['path_property', 'open_search_query']:
if mc_type == '' and \
response == 'Query is not applicable for this model':
for mt in model_types:
formatted_results[query_hash][mt] = ['n_a', response]
elif isinstance(response, str) and \
response == 'Statement type not handled':
formatted_results[query_hash][mc_type] = ['n_a', response]
elif isinstance(response, str) and \
not response == 'Path found but exceeds search depth':
formatted_results[query_hash][mc_type] = ['Fail', response]
else:
formatted_results[query_hash][mc_type] = ['Pass', response]
elif query_type == 'simple_intervention_property':
if response == 'Query is not applicable for this model':
formatted_results[query_hash]['result'] = ['n_a', response]
else:
res = response[0]['result']
if res == 'no_change':
action = 'did not change'
elif res.endswith('increase'):
action = 'increased'
elif res.endswith('decrease'):
action = 'decreased'
if res.startswith('no'):
expl = f'No, the amount of target entity {action}.'
formatted_results[query_hash]['result'] = ['Fail', expl]
else:
expl = f'Yes, the amount of target entity {action}.'
formatted_results[query_hash]['result'] = ['Pass', expl]
formatted_results[query_hash]['image'] = (
response[0]['fig_path'])
elif query_type == 'dynamic_property':
if response == 'Query is not applicable for this model':
formatted_results[query_hash]['result'] = ['n_a', response]
else:
res = int(response[0]['sat_rate'] * 100)
expl = (f'Satisfaction rate is {res}% after '
f'{response[0]["num_sim"]} simulations.')
if res > 50:
formatted_results[query_hash]['result'] = ['Pass', expl]
else:
formatted_results[query_hash]['result'] = ['Fail', expl]
formatted_results[query_hash]['image'] = (
response[0]['fig_path'])
if query_type in ['path_property', 'open_search_query']:
# Loop through the results again to make sure all model types are there
for qh in formatted_results:
for mt in model_types:
if mt not in formatted_results[qh]:
formatted_results[qh][mt] = [
'n_a', 'Model type not supported']
return formatted_results
def load_model_manager_from_cache(model_name, bucket=EMMAA_BUCKET_NAME):
model_manager = model_manager_cache.get(model_name)
if model_manager:
latest_on_s3 = find_latest_s3_file(
bucket, f'results/{model_name}/model_manager_', '.pkl')
cached_date = model_manager.date_str
logger.info(f'Found model manager cached on {cached_date} and '
f'latest file on S3 is {latest_on_s3}')
if cached_date in latest_on_s3:
logger.info(f'Loaded model manager for {model_name} from cache.')
return model_manager
logger.info(f'Loading model manager for {model_name} from S3.')
model_manager = load_model_manager_from_s3(
model_name=model_name, bucket=bucket)
model_manager_cache[model_name] = model_manager
return model_manager
[docs]def answer_queries_from_s3(model_name, db=None, bucket=EMMAA_BUCKET_NAME):
"""Answer registered queries with model manager on s3.
Parameters
----------
model_name : str
Name of EmmaaModel to answer queries for.
db : Optional[emmaa.db.manager.EmmaaDatabaseManager]
If given over-rides the default primary database.
"""
mm = load_model_manager_from_s3(model_name=model_name, bucket=bucket)
qm = QueryManager(db=db, model_managers=[mm])
qm.answer_registered_queries(model_name)