Source code for emmaa.aws_lambda_functions.after_update

"""The AWS Lambda emmaa-after-update definition.

This file contains the function that will be run when Lambda is triggered. It
must be placed on s3, which can either be done manually (not recommended) or
by running:

$ python update_lambda.py after_update.py emmaa-after-update

in this directory.
"""

import boto3
import json


batch = boto3.client('batch')
JOB_DEF = 'emmaa_jobdef'
QUEUE = 'emmaa-after-update'
PROJECT = 'aske'
BRANCH = 'origin/master'


def submit_batch_job(script_command, purpose, job_name, wait_for=None,
                     job_def=JOB_DEF):
    print(f'Submitting job {job_name}')
    core_command = 'bash scripts/git_and_run.sh'
    if BRANCH is not None:
        core_command += f' --branch {BRANCH}'
    core_command += script_command
    print(core_command)
    cont_overrides = {
        'command': ['python', '-m', 'indra.util.aws', 'run_in_batch',
                    '--project', PROJECT, '--purpose', purpose,
                    core_command]
        }
    kwargs = {}
    if wait_for:
        kwargs['dependsOn'] = [{'jobId': job_id, 'type': 'SEQUENTIAL'}
                               for job_id in wait_for]
    ret = batch.submit_job(
        jobName=job_name,
        jobQueue=QUEUE, jobDefinition=job_def,
        containerOverrides=cont_overrides, **kwargs)
    job_id = ret['jobId']
    print(f"Result from job submission: {job_id}")
    return job_id


[docs]def lambda_handler(event, context): """Submit model tests, model and test stats, and query batch jobs. This function is designed to be placed on AWS Lambda, taking the event and context arguments that are passed. Note that this function must always have the same parameters, even if any or all of them are unused, because we do not have control over what Lambda sends as parameters. Event parameter is used here to pass which model manager was updated. Lambda is configured to run this script when ModelManager object is updated. Parameters ---------- event : dict A dictionary containing metadata regarding the triggering event. In this case, we are expecting 'Records', each of which contains a record of a file that was added (or changed) on s3. context : object This is an object containing potentially useful context provided by Lambda. See the documentation cited above for details. Returns ------- ret : dict A dict containing 'statusCode', with a valid HTTP status code, and any other data to be returned to Lambda. """ s3 = boto3.client('s3') records = event['Records'] for rec in records: try: model_key = rec['s3']['object']['key'] except KeyError: pass model_name = model_key.split('/')[1] now_str = model_key.split('model_manager_')[1][:-4] date = now_str[:10] # Store all stats jobs IDs stats_job_ids = [] # Submit model stats job model_stats_command = (' python scripts/run_model_stats_from_s3.py' f' --model {model_name} --stats_mode model') model_stats_id = submit_batch_job( model_stats_command, 'update-emmaa-model-stats', f'{model_name}_model_stats_{now_str}') stats_job_ids.append(model_stats_id) # Find all test corpora for daily runi config_key = f'models/{model_name}/config.json' obj = s3.get_object(Bucket='emmaa', Key=config_key) config = json.loads(obj['Body'].read().decode('utf8')) tests = config['test'].get('test_corpus', 'large_corpus_tests') if isinstance(tests, str): tests = [tests] # For each test run the test and test stats for test_corpus in tests: test_command = (' python scripts/run_model_tests_from_s3.py' f' --model {model_name} --tests {test_corpus}') test_id = submit_batch_job( test_command, 'update-emmaa-results', f'{model_name}_{test_corpus}_tests_{now_str}') test_stats_command = (' python scripts/run_model_stats_from_s3.py' f' --model {model_name} --stats_mode tests' f' --tests {test_corpus}') test_stats_id = submit_batch_job( test_stats_command, 'update-emmaa-test-stats', f'{model_name}_{test_corpus}_stats_{now_str}', [test_id]) stats_job_ids.append(test_stats_id) # Submit notification job notify_command = ( f' python scripts/model_notifications.py --model {model_name} ' f'--test_corpora {" ".join(tc for tc in tests)} --date {date}') submit_batch_job(notify_command, 'model-notify', f'{model_name}_notification_{now_str}', stats_job_ids, job_def='emmaa-email-notifications') # Run queries query_command = (' python scripts/answer_queries_from_s3.py' f' --model {model_name}') submit_batch_job(query_command, 'update-emmaa-queries', f'{model_name}_queries_{now_str}') # Make tests if configured if config.get('make_tests', False): test_update_command = (' python scripts/model_to_tests.py' f' --model {model_name}') submit_batch_job(test_update_command, 'update-emmaa-tests', f'{model_name}_test_update_{now_str}') return 'All jobs sumbitted'