Materialization Engine Core
The following modules include the flask and celery apps as well as helper methods that support the materialization workflows.
Submodules
materializationengine.admin module
materializationengine.app module
- class materializationengine.app.AEEncoder(*, skipkeys=False, ensure_ascii=True, check_circular=True, allow_nan=True, sort_keys=False, indent=None, separators=None, default=None)[source]
Bases:
JSONEncoder- default(obj)[source]
Implement this method in a subclass such that it returns a serializable object for
o, or calls the base implementation (to raise aTypeError).For example, to support arbitrary iterators, you could implement default like this:
def default(self, o): try: iterable = iter(o) except TypeError: pass else: return list(iterable) # Let the base class default method raise the TypeError return JSONEncoder.default(self, o)
materializationengine.celery_init module
materializationengine.celery_worker module
- materializationengine.celery_worker.celery_loggers(logger, *args, **kwargs)[source]
Display the Celery banner appears in the log output. https://www.distributedpython.com/2018/10/01/celery-docker-startup/
- materializationengine.celery_worker.days_till_next_month(date)[source]
function to pick out the same weekday in the next month So if you pass the first wednesday of January, you get the first wednesday of February
- Parameters:
date (datetime.datetime) – a timepoint
- Returns:
same day next month (in the sense of same # of weeday)
- Return type:
datetime.datetime
materializationengine.chunkedgraph_gateway module
materializationengine.config module
- class materializationengine.config.BaseConfig[source]
Bases:
object- ANNO_ENDPOINT = 'None/annotation/'
- AUTH_SERVICE_NAMESPACE = 'datastack'
- AUTH_TOKEN = ''
- AUTH_URI = 'https://global.daf-apis.com/auth'
- BASE_DIR = '/home/docs/checkouts/readthedocs.org/user_builds/materializationengine/checkouts/latest/materializationengine'
- BEAT_SCHEDULES = [{'name': 'Materialized Database Daily (2 Days)', 'minute': 10, 'hour': 8, 'day_of_week': [0, 2, 4, 6], 'task': 'run_daily_periodic_materialization'}, {'name': 'Materialized Database Daily (2 Days) (Wednesdays)', 'minute': 10, 'hour': 8, 'day_of_week': 3, 'day_of_month': '8-14,22-31', 'task': 'run_daily_periodic_materialization'}, {'name': 'Materialized Database Weekly (7 Days)', 'minute': 10, 'hour': 8, 'day_of_week': [1, 5], 'task': 'run_weekly_periodic_materialization'}, {'name': 'Long Term Support Materialized Database (30 days)', 'minute': 10, 'hour': 8, 'day_of_week': 3, 'day_of_month': '1-7,15-21', 'task': 'run_lts_periodic_materialization'}, {'name': 'Remove Expired Databases (Midnight)', 'minute': 0, 'hour': 8, 'task': 'remove_expired_databases'}, {'name': 'Update Live Database', 'minute': 0, 'hour': '0-1,17-23', 'day_of_week': '1-5', 'task': 'run_periodic_database_update'}]
- CELERY_BROKER_URL = 'memory://'
- CELERY_RESULT_BACKEND = 'redis://'
- CELERY_WORKER_IP = '127.0.0.1'
- DATASTACKS = ['minnie65_phase3_v1']
- DAYS_TO_EXPIRE = 7
- DB_CONNECTION_MAX_OVERFLOW = 5
- DB_CONNECTION_POOL_SIZE = 5
- ENV = 'base'
- GLOBAL_SERVER_URL = 'https://global.daf-apis.com'
- HOME = '/home/docs'
- INFOSERVICE_ENDPOINT = 'https://global.daf-apis.com/info'
- INFO_API_VERSION = 2
- LOCAL_SERVER_URL = None
- LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
- LOGGING_LEVEL = 10
- LOGGING_LOCATION = '/home/docs/.materializationengine/bookshelf.log'
- LTS_DAYS_TO_EXPIRE = 30
- MASTER_NAME = None
- MATERIALIZATION_ROW_CHUNK_SIZE = 500
- MAX_DATABASES = 2
- MERGE_TABLES = True
- MIN_DATABASES = 2
- QUERY_LIMIT_SIZE = 200000
- QUEUES_TO_THROTTLE = ['process']
- QUEUE_LENGTH_LIMIT = 10000
- RATELIMIT_STORAGE_URI = 'memory://'
- REDIS_URL = 'redis://'
- SCHEMA_SERVICE_ENDPOINT = 'https://global.daf-apis.com/schema/'
- SEGMENTATION_ENDPOINT = 'https://global.daf-apis.com/segmentation'
- SQLALCHEMY_DATABASE_URI = 'sqlite://'
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- TESTING = False
- THROTTLE_QUEUES = True
- class materializationengine.config.DevConfig[source]
Bases:
BaseConfig- CELERY_BROKER_URL = 'redis://:$None@$None:$None/0'
- CELERY_RESULT_BACKEND = 'redis://:$None@$None:$None/0'
- ENV = 'development'
- REDIS_HOST = None
- REDIS_PASSWORD = None
- REDIS_PORT = None
- REDIS_URL = 'redis://:$None@$None:$None/0'
- SQLALCHEMY_DATABASE_URI = 'postgres://postgres:materialize@db:5432/materialize'
- USE_SENTINEL = False
- class materializationengine.config.ProductionConfig[source]
Bases:
BaseConfig- CELERY_BROKER = None
- CELERY_RESULT_BACKEND = None
- ENV = 'production'
- LOGGING_LEVEL = 20
- REDIS_URL = None
- class materializationengine.config.TestConfig[source]
Bases:
BaseConfig- CELERY_BROKER_URL = 'memory://'
- CELERY_RESULT_BACKEND = 'redis://'
- ENV = 'testing'
- MATERIALIZATION_ROW_CHUNK_SIZE = 2
- SQLALCHEMY_DATABASE_URI = 'postgresql://postgres:postgres@localhost:5432/test_aligned_volume'
- SQLALCHEMY_TRACK_MODIFICATIONS = False
- TESTING = True
materializationengine.database module
materializationengine.errors module
- exception materializationengine.errors.AlignedVolumeNotFoundException[source]
Bases:
MaterializationEngineExceptionerror raised when a aligned_volume is not found
- exception materializationengine.errors.DataStackNotFoundException[source]
Bases:
MaterializationEngineExceptionerror raised when a datastack is not found
- exception materializationengine.errors.MaterializationEngineException[source]
Bases:
Exceptiongeneric error in materialization engine
materializationengine.index_manager module
- class materializationengine.index_manager.IndexCache[source]
Bases:
object- add_indices_sql_commands(table_name: str, model, engine)[source]
Add missing indices by comparing reflected table and model indices. Will add missing indices from model to table.
- Parameters:
table_name (str) – target table to drop constraints and indices
engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine
- Returns:
list of indices added to table
- Return type:
str
- drop_table_indices(table_name: str, engine)[source]
Generate SQL command to drop all indices and constraints on target table.
- Parameters:
table_name (str) – target table to drop constraints and indices
engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine
- Returns:
True if all constraints and indices are dropped
- Return type:
bool
- get_index_from_model(table_name, model, engine)[source]
Generate index mapping, primary key and foreign keys(s) from supplied SQLAlchemy model. Returns a index map.
- Parameters:
model (SqlAlchemy Model) – database model to reflect indices
- Returns:
Index map
- Return type:
dict
- get_table_indices(table_name: str, engine: <module 'sqlalchemy.engine' from '/home/docs/checkouts/readthedocs.org/user_builds/materializationengine/envs/latest/lib/python3.7/site-packages/sqlalchemy/engine/__init__.py'>)[source]
- Reflect current indices, primary key(s) and foreign keys
on given target table using SQLAlchemy inspector method.
- Parameters:
table_name (str) – target table to reflect
engine (SQLAlchemy Engine instance) – supplied SQLAlchemy engine
- Returns:
Map of reflected indices on given table.
- Return type:
dict
materializationengine.info_client module
materializationengine.models module
materializationengine.schemas module
- class materializationengine.schemas.AnalysisTableSchema(*args, **kwargs)[source]
Bases:
SQLAlchemyAutoSchema- opts = <marshmallow_sqlalchemy.schema.SQLAlchemyAutoSchemaOpts object>
- class materializationengine.schemas.AnalysisVersionSchema(*args, **kwargs)[source]
Bases:
SQLAlchemyAutoSchema- opts = <marshmallow_sqlalchemy.schema.SQLAlchemyAutoSchemaOpts object>
- class materializationengine.schemas.CeleryBeatSchema(*, only: Optional[Union[Sequence[str], Set[str]]] = None, exclude: Union[Sequence[str], Set[str]] = (), many: bool = False, context: Optional[Dict] = None, load_only: Union[Sequence[str], Set[str]] = (), dump_only: Union[Sequence[str], Set[str]] = (), partial: Union[bool, Sequence[str], Set[str]] = False, unknown: Optional[str] = None)[source]
Bases:
Schema- opts = <marshmallow.schema.SchemaOpts object>
- class materializationengine.schemas.CronField(*, default: ~typing.Any = <marshmallow.missing>, missing: ~typing.Any = <marshmallow.missing>, data_key: ~typing.Optional[str] = None, attribute: ~typing.Optional[str] = None, validate: ~typing.Optional[~typing.Union[~typing.Callable[[~typing.Any], ~typing.Any], ~typing.Iterable[~typing.Callable[[~typing.Any], ~typing.Any]]]] = None, required: bool = False, allow_none: ~typing.Optional[bool] = None, load_only: bool = False, dump_only: bool = False, error_messages: ~typing.Optional[~typing.Dict[str, str]] = None, **metadata)[source]
Bases:
Field
materializationengine.task_router module
materializationengine.upsert module
materializationengine.utils module
- materializationengine.utils.build_materialized_table_id(aligned_volume: str, table_name: str) str[source]
- materializationengine.utils.create_annotation_model(mat_metadata, with_crud_columns: bool = True, reset_cache: bool = False)[source]
- materializationengine.utils.create_segmentation_model(mat_metadata, reset_cache: bool = False)[source]
materializationengine.views module
- (task)materializationengine.views.get_synapse_info(datastack_name, id)
Proxy that evaluates object once.
Proxywill evaluate the object each time, while the promise will only evaluate it once.