Source code for materializationengine.blueprints.materialize.api

import datetime
import logging

from dynamicannotationdb.models import AnalysisTable, Base
from flask import abort, current_app, request
from flask_accepts import accepts
from flask_restx import Namespace, Resource, inputs, reqparse
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.database import (
    create_session,
    dynamic_annotation_cache,
    sqlalchemy_cache,
)
from materializationengine.info_client import (
    get_aligned_volumes,
    get_datastack_info,
    get_relevant_datastack_info,
)
from dynamicannotationdb.models import AnalysisVersion
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
from materializationengine.blueprints.materialize.schemas import BadRootsSchema
from middle_auth_client import auth_requires_admin, auth_requires_permission
from sqlalchemy import MetaData, Table
from sqlalchemy.engine.url import make_url
from sqlalchemy.exc import NoSuchTableError
from materializationengine.utils import check_write_permission


from materializationengine.blueprints.materialize.schemas import (
    VirtualVersionSchema,
    AnnotationIDListSchema,
)


__version__ = "4.19.2"


bulk_upload_parser = reqparse.RequestParser()
bulk_upload_parser.add_argument(
    "column_mapping", required=True, type=dict, location="json"
)
bulk_upload_parser.add_argument("project", required=True, type=str)
bulk_upload_parser.add_argument("file_path", required=True, type=str)
bulk_upload_parser.add_argument("schema", required=True, type=str)
bulk_upload_parser.add_argument("materialized_ts", type=float)


missing_chunk_parser = reqparse.RequestParser()
missing_chunk_parser.add_argument("chunks", required=True, type=list, location="json")
missing_chunk_parser.add_argument(
    "column_mapping", required=True, type=dict, location="json"
)
missing_chunk_parser.add_argument("project", required=True, type=str)
missing_chunk_parser.add_argument("file_path", required=True, type=str)
missing_chunk_parser.add_argument("schema", required=True, type=str)

get_roots_parser = reqparse.RequestParser()
get_roots_parser.add_argument("lookup_all_root_ids", default=False, type=inputs.boolean)

materialize_parser = reqparse.RequestParser()
materialize_parser.add_argument("days_to_expire", required=True, default=None, type=int)
materialize_parser.add_argument("merge_tables", required=True, type=inputs.boolean)

authorizations = {
    "apikey": {"type": "apiKey", "in": "query", "name": "middle_auth_token"}
}

mat_bp = Namespace(
    "Materialization Engine",
    authorizations=authorizations,
    description="Materialization Engine",
)


[docs]def check_aligned_volume(aligned_volume): aligned_volumes = get_aligned_volumes() if aligned_volume not in aligned_volumes: abort(400, f"aligned volume: {aligned_volume} not valid")
[docs]@mat_bp.route("/celery/test/<int:iterator_length>") class TestWorkflowResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Test workflow pattern", security="apikey") def post(self, iterator_length: int = 50): """Test workflow Args: iterator_length (int): Number of parallel tasks to run. Default = 50 """ from materializationengine.workflows.dummy_workflow import start_test_workflow status = start_test_workflow.s(iterator_length).apply_async() return 200
[docs]@mat_bp.route("/workflow/status/active") class ActiveTasksResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Get current actively running tasks", security="apikey") def get(self): """Get running tasks from celery""" from materializationengine.celery_worker import get_activate_tasks return get_activate_tasks()
[docs]@mat_bp.route("/workflow/status/locks") class LockedTasksResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Get locked tasks", security="apikey") def get(self): """Get locked tasks from redis""" from materializationengine.celery_worker import inspect_locked_tasks ltdict = inspect_locked_tasks(release_locks=False) return {str(k): v for k, v in ltdict.items()}
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Unlock locked tasks", security="apikey") def put(self): "Unlock locked tasks" from materializationengine.celery_worker import inspect_locked_tasks ltdict = inspect_locked_tasks(release_locks=True) return {str(k): v for k, v in ltdict.items()}
[docs]@mat_bp.route("/celery/status/queue") class QueueResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Get task queue size", security="apikey") def get(self): """Get queued tasks for celery workers""" from materializationengine.celery_worker import get_celery_queue_items status = get_celery_queue_items("process") return status
[docs]@mat_bp.route("/celery/status/info") class CeleryResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Get celery worker status", security="apikey") def get(self): """Get celery worker info""" from materializationengine.celery_worker import get_celery_worker_status status = get_celery_worker_status() return status
[docs]@mat_bp.route("/materialize/run/ingest_annotations/datastack/<string:datastack_name>") class ProcessNewAnnotationsResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("process new annotations workflow", security="apikey") def post(self, datastack_name: str): """Process newly added annotations and lookup segmentation data Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.ingest_new_annotations import ( process_new_annotations_workflow, ) datastack_info = get_datastack_info(datastack_name) process_new_annotations_workflow.s(datastack_info).apply_async() return 200
[docs]@mat_bp.route( "/materialize/run/lookup_svid/datastack/<string:datastack_name>/<string:table_name>" ) class ProcessNewSVIDResource(Resource):
[docs] @reset_auth @auth_requires_permission("edit", table_arg="datastack_name") @mat_bp.doc("process new svids workflow", security="apikey") @accepts("AnnotationIDList", schema=AnnotationIDListSchema, api=mat_bp) def post(self, datastack_name: str, table_name: str): """Process newly added annotations and lookup supervoxel data Args: datastack_name (str): name of datastack from infoservice table_name (str): name of table """ from materializationengine.workflows.ingest_new_annotations import ( ingest_table_svids, ) if datastack_name not in current_app.config["DATASTACKS"]: abort(404, f"datastack {datastack_name} not configured for materialization") annotation_ids = request.parsed_obj.get("annotation_ids", None) datastack_info = get_datastack_info(datastack_name) info = ingest_table_svids.s( datastack_info, table_name, annotation_ids ).apply_async() return 200
[docs]@mat_bp.route( "/materialize/run/ingest_annotations/datastack/<string:datastack_name>/<string:table_name>" ) class ProcessNewAnnotationsTableResource(Resource):
[docs] @reset_auth @auth_requires_permission("edit", table_arg="datastack_name") @mat_bp.doc("process new annotations workflow", security="apikey") def post(self, datastack_name: str, table_name: str): """Process newly added annotations and lookup segmentation data Args: datastack_name (str): name of datastack from infoservice table_name (str): name of table """ from materializationengine.workflows.ingest_new_annotations import ( process_new_annotations_workflow, ) datastack_info = get_datastack_info(datastack_name) db = dynamic_annotation_cache.get_db(datastack_info["aligned_volume"]["name"]) check_write_permission(db, table_name) process_new_annotations_workflow.s( datastack_info, table_name=table_name ).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/dense_lookup_root_ids/datastack/<string:datastack_name>") class LookupDenseMissingRootIdsResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Find all null root ids and lookup new roots", security="apikey") def post(self, datastack_name: str): """Run workflow to lookup missing root ids and insert into database across all tables in the database. Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.ingest_new_annotations import ( process_dense_missing_roots_workflow, ) datastack_info = get_datastack_info(datastack_name) process_dense_missing_roots_workflow.s(datastack_info).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/sparse_lookup_root_ids/datastack/<string:datastack_name>/table/<string:table_name>") class LookupSparseMissingRootIdsResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("Find null root ids in table and lookup new root ids", security="apikey") def post(self, datastack_name: str, table_name: str): """Finds null root ids in a given table and lookups new root ids using last updated time stamp. Args: datastack_name (str): name of datastack from infoservice table_name (str): name of table """ from materializationengine.workflows.ingest_new_annotations import ( process_sparse_missing_roots_workflow, ) datastack_info = get_datastack_info(datastack_name) process_sparse_missing_roots_workflow.s(datastack_info, table_name).apply_async() return 200
[docs]@mat_bp.route( "/materialize/run/remove_bad_root_ids/datastack/<string:datastack_name>/table/<string:table_name>" ) class SetBadRootsToNullResource(Resource):
[docs] @reset_auth @auth_requires_admin @accepts("BadRootsSchema", schema=BadRootsSchema, api=mat_bp) @mat_bp.doc("set bad roots to None", security="apikey") def post(self, datastack_name: str, table_name: str): """Run workflow to lookup missing root ids and insert into database Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.ingest_new_annotations import ( fix_root_id_workflow, ) data = request.parsed_obj bad_roots_ids = data["bad_roots"] datastack_info = get_datastack_info(datastack_name) fix_root_id_workflow.s(datastack_info, table_name, bad_roots_ids).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/complete_workflow/datastack/<string:datastack_name>") class CompleteWorkflowResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.expect(materialize_parser) @mat_bp.doc( "ingest segmentations > update roots and freeze materialization", security="apikey", ) def post(self, datastack_name: str): """Create versioned materialization, finds missing segmentations and updates roots Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.complete_workflow import ( run_complete_workflow, ) datastack_info = get_datastack_info(datastack_name) args = materialize_parser.parse_args() days_to_expire = args["days_to_expire"] merge_tables = args["merge_tables"] datastack_info["database_expires"] = days_to_expire datastack_info["merge_tables"] = merge_tables run_complete_workflow.s( datastack_info, days_to_expire, merge_tables ).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/create_frozen/datastack/<string:datastack_name>") class CreateFrozenMaterializationResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.expect(materialize_parser) @mat_bp.doc("create frozen materialization", security="apikey") def post(self, datastack_name: str): """Create a new frozen (versioned) materialization Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.create_frozen_database import ( create_versioned_materialization_workflow, ) args = materialize_parser.parse_args() days_to_expire = args["days_to_expire"] merge_tables = args["merge_tables"] datastack_info = get_datastack_info(datastack_name) create_versioned_materialization_workflow.s( datastack_info, days_to_expire, merge_tables ).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/update_roots/datastack/<string:datastack_name>") class UpdateExpiredRootIdsResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.expect(get_roots_parser) @mat_bp.doc("Update expired root ids", security="apikey") def post(self, datastack_name: str): """Lookup root ids Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.update_root_ids import ( expired_root_id_workflow, ) datastack_info = get_datastack_info(datastack_name) args = get_roots_parser.parse_args() datastack_info["lookup_all_root_ids"] = args["lookup_all_root_ids"] expired_root_id_workflow.s(datastack_info).apply_async() return 200
[docs]@mat_bp.route("/materialize/run/update_database/datastack/<string:datastack_name>") class UpdateLiveDatabaseResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.expect(get_roots_parser) @mat_bp.doc("Ingest new annotations and update expired root ids", security="apikey") def post(self, datastack_name: str): """Ingest new annotations and update expired root ids Args: datastack_name (str): name of datastack from infoservice """ from materializationengine.workflows.update_database_workflow import ( update_database_workflow, ) datastack_info = get_datastack_info(datastack_name) args = get_roots_parser.parse_args() datastack_info["lookup_all_root_ids"] = args["lookup_all_root_ids"] update_database_workflow.s(datastack_info).apply_async() return 200
[docs]@mat_bp.expect(bulk_upload_parser) @mat_bp.route( "/bulk_upload/upload/<string:datastack_name>/<string:table_name>/<string:segmentation_source>/<string:description>" ) class BulkUploadResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("bulk upload", security="apikey") def post( self, datastack_name: str, table_name: str, segmentation_source: str, description: str, ): """Run bulk upload from npy files Args: column_mappings (dict): dict mapping file names to column names in database project (str): bucket project path file_path (str): bucket project path schema (str): type of schema from emannotationschemas datastack_name (str): name of datastack from infoservice table_name (str): name of table in database to create segmentation_source (str): source of segmentation data description (str): text field added to annotation metadata table for reference """ from materializationengine.workflows.bulk_upload import gcs_bulk_upload_workflow args = bulk_upload_parser.parse_args() bulk_upload_info = get_datastack_info(datastack_name) bulk_upload_info.update( { "column_mapping": args["column_mapping"], "project": args["project"], "file_path": args["file_path"], "schema": args["schema"], "datastack": datastack_name, "description": description, "annotation_table_name": table_name, "segmentation_source": segmentation_source, "materialized_ts": args["materialized_ts"], } ) gcs_bulk_upload_workflow.s(bulk_upload_info).apply_async() return f"Datastack upload info : {bulk_upload_info}", 200
[docs]@mat_bp.expect(missing_chunk_parser) @mat_bp.route( "/bulk_upload/missing_chunks/<string:datastack_name>/<string:table_name>/<string:segmentation_source>/<string:description>" ) class InsertMissingChunks(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("insert missing chunks", security="apikey") def post( self, datastack_name: str, table_name: str, segmentation_source: str, description: str, ): """Insert missing chunks of data into database Args: chunks (list): list mapping file names to column names in database datastack_name (str): name of datastack from infoservice table_name (str): name of table in database to create segmentation_source (str): source of segmentation data description (str): text field added to annotation metadata table for reference """ from materializationengine.workflows.bulk_upload import gcs_insert_missing_data args = missing_chunk_parser.parse_args() bulk_upload_info = get_datastack_info(datastack_name) bulk_upload_info.update( { "chunks": args["chunks"], "column_mapping": args["column_mapping"], "project": args["project"], "file_path": args["file_path"], "schema": args["schema"], "datastack": datastack_name, "description": description, "annotation_table_name": table_name, "segmentation_source": segmentation_source, } ) gcs_insert_missing_data.s(bulk_upload_info).apply_async() return f"Uploading : {datastack_name}", 200
[docs]@mat_bp.route("/aligned_volume/<aligned_volume_name>") class DatasetResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("get_aligned_volume_versions", security="apikey") def get(self, aligned_volume_name: str): db = dynamic_annotation_cache.get_db(aligned_volume_name) response = db.database.cached_session.query( AnalysisVersion.datastack ).distinct() aligned_volumes = [r._asdict() for r in response] return aligned_volumes
[docs]@mat_bp.route("/aligned_volumes/<aligned_volume_name>") class VersionResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("get_analysis_versions", security="apikey") def get(self, aligned_volume_name): check_aligned_volume(aligned_volume_name) session = sqlalchemy_cache.get(aligned_volume_name) response = ( session.query(AnalysisVersion) .filter(AnalysisVersion.datastack == aligned_volume_name) .all() ) schema = AnalysisVersionSchema(many=True) versions, error = schema.dump(response) logging.info(versions) if versions: return versions, 200 else: logging.error(error) return abort(404)
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("setup new aligned volume database", security="apikey") def post(self, aligned_volume_name: str): """Create an aligned volume database Args: aligned_volume_name (str): name of aligned_volume from infoservice """ check_aligned_volume(aligned_volume_name) aligned_vol_db = dynamic_annotation_cache.get_db(aligned_volume_name) base = Base base.metadata.bind = aligned_vol_db.database.engine base.metadata.create_all() return 200
[docs]@mat_bp.route("/aligned_volumes/<aligned_volume_name>/version/<version>") class TableResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("get_all_tables", security="apikey") def get(self, aligned_volume_name, version): check_aligned_volume(aligned_volume_name) session = sqlalchemy_cache.get(aligned_volume_name) response = ( session.query(AnalysisTable) .filter(AnalysisTable.analysisversion) .filter(AnalysisVersion.version == version) .filter(AnalysisVersion.datastack == aligned_volume_name) .all() ) schema = AnalysisTableSchema(many=True) tables, error = schema.dump(response) if tables: return tables, 200 else: logging.error(error) return abort(404)
[docs]@mat_bp.route( "/aligned_volumes/<string:aligned_volume_name>/version/<int:version>/tablename/<string:tablename>" ) class AnnotationResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("get_top_materialized_annotations", security="apikey") def get(self, aligned_volume_name: str, version: int, tablename: str): check_aligned_volume(aligned_volume_name) SQL_URI_CONFIG = current_app.config["SQLALCHEMY_DATABASE_URI"] sql_base_uri = SQL_URI_CONFIG.rpartition("/")[0] sql_uri = make_url(f"{sql_base_uri}/{aligned_volume_name}") session, engine = create_session(sql_uri) metadata = MetaData() try: annotation_table = Table( tablename, metadata, autoload=True, autoload_with=engine ) except NoSuchTableError as e: logging.error(f"No table exists {e}") return abort(404) response = session.query(annotation_table).limit(10).all() annotations = [r._asdict() for r in response] return (annotations, 200) if annotations else abort(404)
[docs]@mat_bp.route("/materialize/run/create_virtual/datastack") class CreateVirtualPublicVersionResource(Resource):
[docs] @reset_auth @auth_requires_admin @mat_bp.doc("create virtual materialization", security="apikey") @accepts("VirtualVersionSchema", schema=VirtualVersionSchema, api=mat_bp) def post(self): """Create a virtual version from an existing frozen version. Args: datastack_name (str): name of datastack version (int): version to make virtual copy """ data = request.parsed_obj datastack_name = data.get("datastack_name") target_version = data.get("target_version") tables_to_include = data.get("tables_to_include") virtual_version_name = data.get("virtual_version_name") aligned_volume, pcg_table_name = get_relevant_datastack_info(datastack_name) if not tables_to_include: return abort(400, "No tables included") session = sqlalchemy_cache.get(aligned_volume) analysis_version = ( session.query(AnalysisVersion) .filter(AnalysisVersion.version == target_version) .filter(AnalysisVersion.datastack == datastack_name) .one() ) if not analysis_version.valid: return abort(404, f"Version {target_version} is not a valid version") included_tables = ( session.query(AnalysisTable) .filter(AnalysisTable.analysisversion_id == analysis_version.id) .filter(AnalysisTable.table_name.in_(tables_to_include)) .all() ) if not included_tables: return abort( 404, f"No tables {tables_to_include} found in target version {target_version}", ) virtual_datastack_name = f"{virtual_version_name}" time_to_expire = analysis_version.expires_on - datetime.datetime.utcnow() if time_to_expire.days < 1000: expiration_timestamp = str( analysis_version.expires_on + datetime.timedelta(days=36525) ) else: expiration_timestamp = analysis_version.expires_on virtual_analysis_version = AnalysisVersion( datastack=virtual_datastack_name, time_stamp=analysis_version.time_stamp, version=analysis_version.version, valid=True, expires_on=expiration_timestamp, parent_version=analysis_version.id, status="AVAILABLE", ) session.add(virtual_analysis_version) session.flush() for table in included_tables: table = AnalysisTable( aligned_volume=aligned_volume, schema=table.schema, table_name=table.table_name, valid=True, created=table.created, analysisversion_id=virtual_analysis_version.id, ) session.add(table) analysis_version.expires_on = expiration_timestamp try: session.commit() except Exception as e: session.rollback() logging.exception(f"SQL Error: {e}") raise e finally: session.close() return f"{virtual_datastack_name} created", 200