Source code for materializationengine.workflows.periodic_database_removal

"""
Periodically clean up expired materialized databases.
"""
import itertools
from datetime import datetime
from typing import List

from celery.utils.log import get_task_logger
from dynamicannotationdb.models import AnalysisVersion
from materializationengine.celery_init import celery
from materializationengine.database import create_session
from materializationengine.info_client import get_aligned_volumes, get_datastack_info
from materializationengine.utils import get_config_param
from sqlalchemy import create_engine
from sqlalchemy.engine.url import make_url

celery_logger = get_task_logger(__name__)


[docs]def get_aligned_volumes_databases(): aligned_volumes = get_aligned_volumes() SQL_URI_CONFIG = get_config_param("SQLALCHEMY_DATABASE_URI") sql_base_uri = SQL_URI_CONFIG.rpartition("/")[0] engine = create_engine(sql_base_uri) with engine.connect() as connection: result = connection.execute("SELECT datname FROM pg_database;") databases = [database[0] for database in result] aligned_volume_databases = list(set(aligned_volumes).intersection(databases)) return aligned_volume_databases
[docs]def get_existing_databases(engine) -> List: result = engine.execute("SELECT datname FROM pg_database;").fetchall() return list(itertools.chain.from_iterable(result))
[docs]def get_all_versions(session): versions = session.query(AnalysisVersion).all() return [str(version) for version in versions]
[docs]def get_valid_versions(session): valid_versions = ( session.query(AnalysisVersion).filter(AnalysisVersion.valid == True).all() ) return [str(version) for version in valid_versions]
@celery.task(name="workflow:remove_expired_databases") def remove_expired_databases(delete_threshold: int = 5) -> str: """ Remove expired database from time this method is called. """ aligned_volume_databases = get_aligned_volumes_databases() datastacks = get_config_param("DATASTACKS") current_time = datetime.utcnow() remove_db_cron_info = [] for datastack in datastacks: datastack_info = get_datastack_info(datastack) aligned_volume = datastack_info["aligned_volume"]["name"] if aligned_volume in aligned_volume_databases: SQL_URI_CONFIG = get_config_param("SQLALCHEMY_DATABASE_URI") sql_base_uri = SQL_URI_CONFIG.rpartition("/")[0] sql_uri = make_url(f"{sql_base_uri}/{aligned_volume}") session, engine = create_session(sql_uri) session.expire_on_commit = False # get number of expired dbs that are ready for deletion try: expired_results = ( session.query(AnalysisVersion) .filter(AnalysisVersion.expires_on <= current_time) .order_by(AnalysisVersion.time_stamp) .all() ) expired_versions = [str(expired_db) for expired_db in expired_results] non_valid_results = ( session.query(AnalysisVersion) .filter(AnalysisVersion.valid == False) .order_by(AnalysisVersion.time_stamp) .all() ) # some databases might have failed to materialize completely but are still present on disk non_valid_versions = [str(non_valid_version) for non_valid_version in non_valid_results] versions = list(set(expired_versions + non_valid_versions)) except Exception as sql_error: celery_logger.error(f"Error: {sql_error}") continue # get databases that exist currently, filter by materialized dbs database_list = get_existing_databases(engine) databases = [ database for database in database_list if database.startswith(datastack) ] # get databases to delete that are currently present (ordered by timestamp) databases_to_delete = [ database for database in versions if database in databases ] dropped_dbs = [] if len(databases) > delete_threshold: with engine.connect() as conn: conn.execution_options(isolation_level="AUTOCOMMIT") for database in databases_to_delete: # see if any materialized databases exist existing_databases = get_existing_databases(engine) mat_versions = get_all_versions(session) remaining_databases = set(mat_versions).intersection( existing_databases ) # double check to see if there is only one valid db remaining valid_versions = get_valid_versions(session) remaining_valid_databases = set(valid_versions).intersection( existing_databases ) if ( len(remaining_databases) == 1 or len(remaining_valid_databases) == 1 ): celery_logger.info( f"Only one materialized database remaining: {database}, removal stopped." ) break if len(remaining_databases) == delete_threshold: break if (len(databases) - len(dropped_dbs)) > delete_threshold: try: sql = ( "SELECT 1 FROM pg_database WHERE datname='%s'" % database ) result_proxy = conn.execute(sql) result = result_proxy.scalar() celery_logger.info( f"Database to be dropped: {database} exists: {result}" ) if result: drop_connections = f""" SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = '{database}' AND pid <> pg_backend_pid() """ conn.execute(drop_connections) celery_logger.info( f"Dropped connections to: {database}" ) sql = f"DROP DATABASE {database}" result_proxy = conn.execute(sql) celery_logger.info(f"Database: {database} removed") # strip version from database string database_version = database.rsplit("__mat")[-1] expired_database = ( session.query(AnalysisVersion) .filter( AnalysisVersion.version == database_version ) .one() ) expired_database.valid = False expired_database.status = "EXPIRED" session.commit() celery_logger.info( f"Database '{expired_database}' dropped" ) dropped_dbs.append(database) except Exception as e: celery_logger.error( f"ERROR: {e}: {database} does not exist" ) remove_db_cron_info.append(dropped_dbs) session.close() return remove_db_cron_info