from dynamicannotationdb.models import AnalysisTable, AnalysisVersion
from flask import abort, request
from flask_accepts import accepts
from flask_restx import Namespace, Resource, inputs, reqparse
from materializationengine.blueprints.client.schemas import (
ComplexQuerySchema,
SimpleQuerySchema,
)
from materializationengine.blueprints.client.utils import (
add_warnings_to_headers,
update_notice_text_warnings,
)
from materializationengine.blueprints.reset_auth import reset_auth
from materializationengine.database import dynamic_annotation_cache, sqlalchemy_cache
from materializationengine.info_client import (
get_aligned_volumes,
get_relevant_datastack_info,
)
from materializationengine.blueprints.client.common import (
handle_complex_query,
handle_simple_query,
validate_table_args,
get_analysis_version_and_table,
get_flat_model,
unhandled_exception as common_unhandled_exception,
)
from materializationengine.models import MaterializedMetadata
from materializationengine.schemas import AnalysisTableSchema, AnalysisVersionSchema
from middle_auth_client import auth_requires_permission
from materializationengine.blueprints.client.datastack import validate_datastack
__version__ = "4.19.2"
authorizations = {
"apikey": {"type": "apiKey", "in": "query", "name": "middle_auth_token"}
}
client_bp = Namespace(
"Materialization Client",
authorizations=authorizations,
description="Materialization Client",
)
[docs]@client_bp.errorhandler(Exception)
def unhandled_exception(e):
return common_unhandled_exception(e)
annotation_parser = reqparse.RequestParser()
annotation_parser.add_argument(
"annotation_ids", type=int, action="split", help="list of annotation ids"
)
annotation_parser.add_argument(
"pcg_table_name", type=str, help="name of pcg segmentation table"
)
query_parser = reqparse.RequestParser()
query_parser.add_argument(
"return_pyarrow",
type=inputs.boolean,
default=True,
required=False,
location="args",
help="whether to return query in pyarrow compatible binary format (faster), false returns json",
)
query_parser.add_argument(
"arrow_format",
type=inputs.boolean,
default=False,
required=False,
location="args",
help=("whether to convert dataframe to pyarrow ipc batch format"),
)
query_parser.add_argument(
"split_positions",
type=inputs.boolean,
default=False,
required=False,
location="args",
help="whether to return position columns as seperate x,y,z columns (faster)",
)
query_parser.add_argument(
"count",
type=inputs.boolean,
default=False,
required=False,
location="args",
help="whether to only return the count of a query",
)
query_parser.add_argument(
"random_sample",
type=inputs.positive,
default=None,
required=False,
location="args",
help="How many samples to randomly get using tablesample on annotation tables, useful for visualization of large tables does not work as a random sample of query",
)
[docs]def check_aligned_volume(aligned_volume):
aligned_volumes = get_aligned_volumes()
if aligned_volume not in aligned_volumes:
abort(400, f"aligned volume: {aligned_volume} not valid")
[docs]@client_bp.route("/datastack/<string:datastack_name>/versions")
class DatastackVersions(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@client_bp.doc("datastack_versions", security="apikey")
def get(self, datastack_name: str):
"""get available versions
Args:
datastack_name (str): datastack name
Returns:
list(int): list of versions that are available
"""
aligned_volume_name, pcg_table_name = get_relevant_datastack_info(
datastack_name
)
session = sqlalchemy_cache.get(aligned_volume_name)
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.valid == True)
.all()
)
versions = [av.version for av in response]
return versions, 200
[docs]@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>")
class DatastackVersion(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@client_bp.doc("version metadata", security="apikey")
def get(self, datastack_name: str, version: int):
"""get version metadata
Args:
datastack_name (str): datastack name
version (int): version number
Returns:
dict: metadata dictionary for this version
"""
aligned_volume_name, pcg_table_name = get_relevant_datastack_info(
datastack_name
)
session = sqlalchemy_cache.get(aligned_volume_name)
response = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.datastack == datastack_name)
.filter(AnalysisVersion.version == version)
.first()
)
if response is None:
return "No version found", 404
schema = AnalysisVersionSchema()
return schema.dump(response), 200
[docs]@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/tables")
class FrozenTableVersions(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@client_bp.doc("get_frozen_tables", security="apikey")
def get(self, datastack_name: str, version: int):
"""get frozen tables
Args:
datastack_name (str): datastack name
version (int): version number
Returns:
list(str): list of frozen tables in this version
"""
aligned_volume_name, pcg_table_name = get_relevant_datastack_info(
datastack_name
)
session = sqlalchemy_cache.get(aligned_volume_name)
av = (
session.query(AnalysisVersion)
.filter(AnalysisVersion.version == version)
.filter(AnalysisVersion.datastack == datastack_name)
.first()
)
if av is None:
abort(404, f"version {version} does not exist for {datastack_name} ")
response = (
session.query(AnalysisTable)
.filter(AnalysisTable.analysisversion_id == av.id)
.filter(AnalysisTable.valid == True)
.all()
)
if response is None:
return None, 404
return [r.table_name for r in response], 200
[docs]@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/count"
)
class FrozenTableCount(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@validate_datastack
@client_bp.doc("simple_query", security="apikey")
def get(
self,
datastack_name: str,
version: int,
table_name: str,
target_datastack: str = None,
target_version: int = None,
):
"""get annotation count in table
Args:
datastack_name (str): datastack name of table
version (int): version of table
table_name (str): table name
Returns:
int: number of rows in this table
"""
aligned_volume_name, pcg_table_name = get_relevant_datastack_info(
datastack_name
)
validate_table_args([table_name], target_datastack, target_version)
mat_db_name = f"{datastack_name}__mat{version}"
session = sqlalchemy_cache.get(mat_db_name)
mat_row_count = (
session.query(MaterializedMetadata.row_count)
.filter(MaterializedMetadata.table_name == table_name)
.scalar()
)
return mat_row_count, 200
[docs]@client_bp.expect(query_parser)
@client_bp.route(
"/datastack/<string:datastack_name>/version/<int:version>/table/<string:table_name>/query"
)
class FrozenTableQuery(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@validate_datastack
@client_bp.doc("simple_query", security="apikey")
@accepts("SimpleQuerySchema", schema=SimpleQuerySchema, api=client_bp)
def post(
self,
datastack_name: str,
version: int,
table_name: str,
target_datastack: str = None,
target_version: int = None,
):
"""endpoint for doing a query with filters
Args:
datastack_name (str): datastack name
version (int): version number
table_name (str): table name
Payload:
All values are optional. Limit has an upper bound set by the server.
Consult the schema of the table for column names and appropriate values
{
"filter_out_dict": {
"tablename":{
"column_name":[excluded,values]
}
},
"offset": 0,
"limit": 200000,
"select_columns": [
"column","names"
],
"filter_in_dict": {
"tablename":{
"column_name":[included,values]
}
},
"filter_equal_dict": {
"tablename":{
"column_name":value
}
"filter_spatial_dict": {
"tablename": {
"column_name": [[min_x, min_y, min_z], [max_x, max_y, max_z]]
}
}
Returns:
pyarrow.buffer: a series of bytes that can be deserialized using pyarrow.deserialize
"""
args = query_parser.parse_args()
data = request.parsed_obj
return handle_simple_query(
datastack_name,
version,
table_name,
target_datastack,
target_version,
args,
data,
False,
)
[docs]@client_bp.expect(query_parser)
@client_bp.route("/datastack/<string:datastack_name>/version/<int:version>/query")
class FrozenQuery(Resource):
[docs] @reset_auth
@auth_requires_permission("view", table_arg="datastack_name")
@validate_datastack
@client_bp.doc("complex_query", security="apikey")
@accepts("ComplexQuerySchema", schema=ComplexQuerySchema, api=client_bp)
def post(
self,
datastack_name: str,
version: int,
target_datastack: str = None,
target_version: int = None,
):
"""endpoint for doing a query with filters and joins
Args:
datastack_name (str): datastack name
version (int): version number
Payload:
All values are optional. Limit has an upper bound set by the server.
Consult the schema of the table for column names and appropriate values
{
"tables":[["table1", "table1_join_column"],
["table2", "table2_join_column"]],
"filter_out_dict": {
"tablename":{
"column_name":[excluded,values]
}
},
"offset": 0,
"limit": 200000,
"select_columns": [
"column","names"
],
"filter_in_dict": {
"tablename":{
"column_name":[included,values]
}
},
"filter_equal_dict": {
"tablename":{
"column_name":value
}
}
"filter_spatial_dict": {
"tablename":{
"column_name":[[min_x,min_y,minz], [max_x_max_y_max_z]]
}
}
}
Returns:
pyarrow.buffer: a series of bytes that can be deserialized using pyarrow.deserialize
"""
args = query_parser.parse_args()
data = request.parsed_obj
return handle_complex_query(
datastack_name, version, target_datastack, target_version, args, data, False
)