Materialization Engine Core

The following modules include the flask and celery apps as well as helper methods that support the materialization workflows.

Submodules

materializationengine.admin module

materializationengine.admin.setup_admin(app, db)[source]

materializationengine.app module

class materializationengine.app.AEEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]

Bases: JSONEncoder

default(obj)[source]

Implement this method in a subclass such that it returns a serializable object for o, or calls the base implementation (to raise a TypeError).

For example, to support arbitrary iterators, you could implement default like this:

def default(self, o):
    try:
        iterable = iter(o)
    except TypeError:
        pass
    else:
        return list(iterable)
    # Let the base class default method raise the TypeError
    return JSONEncoder.default(self, o)
materializationengine.app.create_app(config_name: Optional[str] = None)[source]

materializationengine.celery_init module

materializationengine.celery_worker module

materializationengine.celery_worker.celery_loggers(logger, *args, **kwargs)[source]

Display the Celery banner appears in the log output. https://www.distributedpython.com/2018/10/01/celery-docker-startup/

materializationengine.celery_worker.create_celery(app=None)[source]
materializationengine.celery_worker.days_till_next_month(date)[source]

function to pick out the same weekday in the next month So if you pass the first wednesday of January, you get the first wednesday of February

Parameters:

date (datetime.datetime) – a timepoint

Returns:

same day next month (in the sense of same # of weeday)

Return type:

datetime.datetime

materializationengine.celery_worker.get_activate_tasks()[source]
materializationengine.celery_worker.get_celery_queue_items(queue_name: str)[source]
materializationengine.celery_worker.get_celery_worker_status()[source]
materializationengine.celery_worker.inspect_locked_tasks(release_locks: bool = False)[source]
materializationengine.celery_worker.setup_periodic_tasks(sender, **kwargs)[source]

materializationengine.chunkedgraph_gateway module

class materializationengine.chunkedgraph_gateway.ChunkedGraphGateway(token_file=None, server_address='http://pychunkedgraph-service/', global_server_address='https://global.daf-apis.com')[source]

Bases: object

get_client(table_id: str)[source]
init_pcg(table_id: str)[source]

materializationengine.config module

class materializationengine.config.BaseConfig[source]

Bases: object

ANNO_ENDPOINT = 'None/annotation/'
AUTH_SERVICE_NAMESPACE = 'datastack'
AUTH_TOKEN = ''
AUTH_URI = 'https://global.daf-apis.com/auth'
BASE_DIR = '/home/docs/checkouts/readthedocs.org/user_builds/materializationengine/checkouts/latest/materializationengine'
BEAT_SCHEDULES = [{'name': 'Materialized Database Daily (2 Days)', 'minute': 10, 'hour': 8, 'day_of_week': [0, 2, 4, 6], 'task': 'run_daily_periodic_materialization'}, {'name': 'Materialized Database Daily (2 Days) (Wednesdays)', 'minute': 10, 'hour': 8, 'day_of_week': 3, 'day_of_month': '8-14,22-31', 'task': 'run_daily_periodic_materialization'}, {'name': 'Materialized Database Weekly (7 Days)', 'minute': 10, 'hour': 8, 'day_of_week': [1, 5], 'task': 'run_weekly_periodic_materialization'}, {'name': 'Long Term Support Materialized Database (30 days)', 'minute': 10, 'hour': 8, 'day_of_week': 3, 'day_of_month': '1-7,15-21', 'task': 'run_lts_periodic_materialization'}, {'name': 'Remove Expired Databases (Midnight)', 'minute': 0, 'hour': 8, 'task': 'remove_expired_databases'}, {'name': 'Update Live Database', 'minute': 0, 'hour': '0-1,17-23', 'day_of_week': '1-5', 'task': 'run_periodic_database_update'}]
CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'redis://'
CELERY_WORKER_IP = '127.0.0.1'
DATASTACKS = ['minnie65_phase3_v1']
DAYS_TO_EXPIRE = 7
DB_CONNECTION_MAX_OVERFLOW = 5
DB_CONNECTION_POOL_SIZE = 5
ENV = 'base'
GLOBAL_SERVER_URL = 'https://global.daf-apis.com'
HOME = '/home/docs'
INFOSERVICE_ENDPOINT = 'https://global.daf-apis.com/info'
INFO_API_VERSION = 2
LOCAL_SERVER_URL = None
LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
LOGGING_LEVEL = 10
LOGGING_LOCATION = '/home/docs/.materializationengine/bookshelf.log'
LTS_DAYS_TO_EXPIRE = 30
MASTER_NAME = None
MATERIALIZATION_ROW_CHUNK_SIZE = 500
MAX_DATABASES = 2
MERGE_TABLES = True
MIN_DATABASES = 2
QUERY_LIMIT_SIZE = 200000
QUEUES_TO_THROTTLE = ['process']
QUEUE_LENGTH_LIMIT = 10000
RATELIMIT_STORAGE_URI = 'memory://'
REDIS_URL = 'redis://'
SCHEMA_SERVICE_ENDPOINT = 'https://global.daf-apis.com/schema/'
SEGMENTATION_ENDPOINT = 'https://global.daf-apis.com/segmentation'
SQLALCHEMY_DATABASE_URI = 'sqlite://'
SQLALCHEMY_TRACK_MODIFICATIONS = False
TESTING = False
THROTTLE_QUEUES = True
class materializationengine.config.DevConfig[source]

Bases: BaseConfig

CELERY_BROKER_URL = 'redis://:$None@$None:$None/0'
CELERY_RESULT_BACKEND = 'redis://:$None@$None:$None/0'
ENV = 'development'
REDIS_HOST = None
REDIS_PASSWORD = None
REDIS_PORT = None
REDIS_URL = 'redis://:$None@$None:$None/0'
SQLALCHEMY_DATABASE_URI = 'postgres://postgres:materialize@db:5432/materialize'
USE_SENTINEL = False
class materializationengine.config.ProductionConfig[source]

Bases: BaseConfig

CELERY_BROKER = None
CELERY_RESULT_BACKEND = None
ENV = 'production'
LOGGING_LEVEL = 20
REDIS_URL = None
class materializationengine.config.TestConfig[source]

Bases: BaseConfig

CELERY_BROKER_URL = 'memory://'
CELERY_RESULT_BACKEND = 'redis://'
ENV = 'testing'
MATERIALIZATION_ROW_CHUNK_SIZE = 2
SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/test_aligned_volume'
SQLALCHEMY_TRACK_MODIFICATIONS = False
TESTING = True
materializationengine.config.configure_app(app)[source]

materializationengine.database module

class materializationengine.database.DynamicMaterializationCache[source]

Bases: object

get_db(database: str) DynamicAnnotationInterface[source]
invalidate_cache()[source]
class materializationengine.database.SqlAlchemyCache[source]

Bases: object

get(aligned_volume: str)[source]
get_engine(aligned_volume: str)[source]
invalidate_cache()[source]
materializationengine.database.create_session(sql_uri: Optional[str] = None)[source]
materializationengine.database.get_sql_url_params(sql_url)[source]
materializationengine.database.ping_connection(session)[source]
materializationengine.database.reflect_tables(sql_base, database_name)[source]

materializationengine.errors module

exception materializationengine.errors.AlignedVolumeNotFoundException[source]

Bases: MaterializationEngineException

error raised when a aligned_volume is not found

exception materializationengine.errors.AnnotationParseFailure[source]

Bases: MaterializeAnnotationException

exception materializationengine.errors.DataStackNotFoundException[source]

Bases: MaterializationEngineException

error raised when a datastack is not found

exception materializationengine.errors.IndexMatchError[source]

Bases: KeyError

exception materializationengine.errors.MaterializationEngineException[source]

Bases: Exception

generic error in materialization engine

exception materializationengine.errors.MaterializeAnnotationException[source]

Bases: Exception

exception materializationengine.errors.RootIDNotFoundException[source]

Bases: MaterializeAnnotationException

exception materializationengine.errors.TaskFailure[source]

Bases: Exception

exception materializationengine.errors.TaskNotFound(task_name: str, task_dict: dict)[source]

Bases: KeyError

Exception raised when periodic task is not found in the periodic task dict.

exception materializationengine.errors.WrongModelType[source]

Bases: KeyError

materializationengine.index_manager module

class materializationengine.index_manager.IndexCache[source]

Bases: object

add_indices_sql_commands(table_name: str, model, engine)[source]

Add missing indices by comparing reflected table and model indices. Will add missing indices from model to table.

Parameters:
  • table_name (str) – target table to drop constraints and indices

  • engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine

Returns:

list of indices added to table

Return type:

str

drop_table_indices(table_name: str, engine)[source]

Generate SQL command to drop all indices and constraints on target table.

Parameters:
  • table_name (str) – target table to drop constraints and indices

  • engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine

Returns:

True if all constraints and indices are dropped

Return type:

bool

get_index_from_model(table_name, model, engine)[source]

Generate index mapping, primary key and foreign keys(s) from supplied SQLAlchemy model. Returns a index map.

Parameters:

model (SqlAlchemy Model) – database model to reflect indices

Returns:

Index map

Return type:

dict

get_table_indices(table_name: str, engine: <module 'sqlalchemy.engine' from '/home/docs/checkouts/readthedocs.org/user_builds/materializationengine/envs/latest/lib/python3.7/site-packages/sqlalchemy/engine/__init__.py'>)[source]
Reflect current indices, primary key(s) and foreign keys

on given target table using SQLAlchemy inspector method.

Parameters:
  • table_name (str) – target table to reflect

  • engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine

Returns:

Map of reflected indices on given table.

Return type:

dict

materializationengine.info_client module

materializationengine.info_client.get_aligned_volume(aligned_volume)[source]
materializationengine.info_client.get_aligned_volumes()[source]
materializationengine.info_client.get_datastack_info(datastack_name)[source]
materializationengine.info_client.get_datastacks()[source]
materializationengine.info_client.get_relevant_datastack_info(datastack_name)[source]

materializationengine.models module

class materializationengine.models.MaterializedMetadata(**kwargs)[source]

Bases: Base

id
materialized_timestamp
row_count
schema
table_name

materializationengine.schemas module

class materializationengine.schemas.AnalysisTableSchema(*args, **kwargs)[source]

Bases: SQLAlchemyAutoSchema

class Meta[source]

Bases: object

load_instance = True
model

alias of AnalysisTable

opts = <marshmallow_sqlalchemy.schema.SQLAlchemyAutoSchemaOpts object>
class materializationengine.schemas.AnalysisVersionSchema(*args, **kwargs)[source]

Bases: SQLAlchemyAutoSchema

class Meta[source]

Bases: object

load_instance = True
model

alias of AnalysisVersion

opts = <marshmallow_sqlalchemy.schema.SQLAlchemyAutoSchemaOpts object>
class materializationengine.schemas.CeleryBeatSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)[source]

Bases: Schema

opts = <marshmallow.schema.SchemaOpts object>
class materializationengine.schemas.CronField(*, default: ~typing.Any = <marshmallow.missing>, missing: ~typing.Any = <marshmallow.missing>, data_key: ~typing.Optional[str] = None, attribute: ~typing.Optional[str] = None, validate: ~typing.Optional[~typing.Union[~typing.Callable[[~typing.Any], ~typing.Any], ~typing.Iterable[~typing.Callable[[~typing.Any], ~typing.Any]]]] = None, required: bool = False, allow_none: ~typing.Optional[bool] = None, load_only: bool = False, dump_only: bool = False, error_messages: ~typing.Optional[~typing.Dict[str, str]] = None, **metadata)[source]

Bases: Field

class materializationengine.schemas.VersionErrorTableSchema(*args, **kwargs)[source]

Bases: SQLAlchemyAutoSchema

class Meta[source]

Bases: object

load_instance = True
model

alias of VersionErrorTable

opts = <marshmallow_sqlalchemy.schema.SQLAlchemyAutoSchemaOpts object>

materializationengine.shared_tasks module

(task)materializationengine.shared_tasks.add_index(database: dict, command: str)

Add an index or a contrainst to a table.

Parameters:
  • mat_metadata (dict) – datastack info for the aligned_volume derived from the infoservice

  • command (str) – sql command to create an index or constraint

Raises:

self.retry – retries task when an error creating an index occurs

Returns:

String of SQL command

Return type:

str

materializationengine.shared_tasks.check_if_task_is_running(task_name: str, worker_name_prefix: str) bool[source]

Check if a task is running under a worker with a specified prefix name. If the task is found to be in an active state then return True.

Parameters:
  • task_name (str) – name of task to check if it is running

  • worker_name_prefix (str) – prefix of celery worker, used to check specific queue.

Returns:

True if task_name is running else False

Return type:

bool

materializationengine.shared_tasks.chunk_ids(mat_metadata, model, chunk_size: int)[source]
(task)materializationengine.shared_tasks.collect_data(*args, **kwargs)

Proxy that evaluates object once.

Proxy will evaluate the object each time, while the promise will only evaluate it once.

materializationengine.shared_tasks.create_chunks(data_list: List, chunk_size: int) Generator[source]

Create chunks from list with fixed size

Parameters:
  • data_list (List) – list to chunk

  • chunk_size (int) – size of chunk

Yields:

List – generator of chunks

(task)materializationengine.shared_tasks.fin(*args, **kwargs)

Proxy that evaluates object once.

Proxy will evaluate the object each time, while the promise will only evaluate it once.

materializationengine.shared_tasks.generate_chunked_model_ids(mat_metadata: dict, use_segmentation_model=False) List[List][source]

Creates list of chunks with start:end index for chunking queries for materialization.

Parameters:

mat_metadata (dict) – Materialization metadata

Returns:

list of list containing start and end indices

Return type:

List[List]

materializationengine.shared_tasks.get_materialization_info(datastack_info: dict, analysis_version: Optional[int] = None, materialization_time_stamp: Optional[utcnow] = None, skip_table: bool = False, row_size: int = 1000000, table_name: Optional[str] = None, skip_row_count: bool = False) List[dict][source]

Initialize materialization by an aligned volume name. Iterates through all tables in a aligned volume database and gathers metadata for each table. The list of tables are passed to workers for materialization.

Parameters:
  • datastack_info (dict) – Datastack info

  • analysis_version (int, optional) – Analysis version to use for frozen materialization. Defaults to None.

  • skip_table (bool, optional) – Triggers row count for skipping tables larger than row_size arg. Defaults to False.

  • row_size (int, optional) – Row size number to check. Defaults to 1_000_000.

Returns:

[description]

Return type:

List[dict]

materializationengine.shared_tasks.monitor_task_states(task_ids: List, polling_rate: int = 0.2)[source]
materializationengine.shared_tasks.monitor_workflow_state(workflow: AsyncResult, polling_rate: int = 0.2)[source]
materializationengine.shared_tasks.query_id_range(column, start_id: int, end_id: int)[source]
(task)materializationengine.shared_tasks.update_metadata(mat_metadata: dict)

Update ‘last_updated’ column in the segmentation metadata table for a given segmentation table.

Parameters:

mat_metadata (dict) – materialization metadata

Returns:

description of table that was updated

Return type:

str

(task)materializationengine.shared_tasks.workflow_complete(workflow_name)

Proxy that evaluates object once.

Proxy will evaluate the object each time, while the promise will only evaluate it once.

(task)materializationengine.shared_tasks.workflow_failed(request, exc, traceback, mat_info, *args, **kwargs)

Proxy that evaluates object once.

Proxy will evaluate the object each time, while the promise will only evaluate it once.

materializationengine.task_router module

class materializationengine.task_router.TaskRouter[source]

Bases: object

route_for_task(task, *args, **kwargs)[source]

materializationengine.upsert module

materializationengine.upsert.chunk_rows(data, chunksize: Optional[int] = None)[source]
materializationengine.upsert.create_sql_rows(session, data_dict: dict, model)[source]

Yields a dictionary if the record’s id already exists, a row object otherwise.

TODO: strip unneeded if else statements

materializationengine.upsert.upsert(session, data, model, chunksize=None)[source]

materializationengine.utils module

materializationengine.utils.build_materialized_table_id(aligned_volume: str, table_name: str) str[source]
materializationengine.utils.check_ownership(db, table_name)[source]
materializationengine.utils.check_read_permission(db, table_name)[source]
materializationengine.utils.check_write_permission(db, table_name)[source]
materializationengine.utils.create_annotation_model(mat_metadata, with_crud_columns: bool = True, reset_cache: bool = False)[source]
materializationengine.utils.create_segmentation_model(mat_metadata, reset_cache: bool = False)[source]
materializationengine.utils.get_app_base_path()[source]
materializationengine.utils.get_config_param(config_param: str)[source]
materializationengine.utils.get_geom_from_wkb(wkb)[source]
materializationengine.utils.get_instance_folder_path()[source]
materializationengine.utils.get_query_columns_by_suffix(AnnotationModel, SegmentationModel, suffix)[source]
materializationengine.utils.make_root_id_column_name(column_name: str)[source]

materializationengine.views module

materializationengine.views.before_request()[source]
materializationengine.views.cell_type_local_report(datastack_name, id)[source]
materializationengine.views.check_if_complete(datastack_name, id, task_id)[source]
materializationengine.views.datastack_view(datastack_name)[source]
materializationengine.views.generic_report(datastack_name, id)[source]
materializationengine.views.get_job_info(job_name: str)[source]
materializationengine.views.get_jobs()[source]
(task)materializationengine.views.get_synapse_info(datastack_name, id)

Proxy that evaluates object once.

Proxy will evaluate the object each time, while the promise will only evaluate it once.

materializationengine.views.index()[source]
materializationengine.views.jobs()[source]
materializationengine.views.make_flat_model(db, table: AnalysisTable)[source]
materializationengine.views.synapse_report(datastack_name, id)[source]
materializationengine.views.table_view(datastack_name, id: int)[source]
materializationengine.views.version_error(datastack_name: str, id: int)[source]
materializationengine.views.version_view(datastack_name: str, id: int)[source]

materializationengine.blueprints.reset_auth module

materializationengine.blueprints.reset_auth.reset_auth(f)[source]

Module contents