Source code for materializationengine.database

from urllib.parse import urlparse

from celery.utils.log import get_task_logger
from dynamicannotationdb import DynamicAnnotationInterface
from flask import current_app
from sqlalchemy import MetaData, create_engine
from sqlalchemy.engine.url import make_url
from sqlalchemy.orm import scoped_session, sessionmaker

from materializationengine.celery_worker import celery_logger
from materializationengine.utils import get_config_param

celery_logger = get_task_logger(__name__)


[docs]def create_session(sql_uri: str = None): pool_size = current_app.config.get("DB_CONNECTION_POOL_SIZE", 5) max_overflow = current_app.config.get("DB_CONNECTION_MAX_OVERFLOW", 5) engine = create_engine( sql_uri, pool_recycle=3600, pool_size=pool_size, max_overflow=max_overflow, pool_pre_ping=True, ) Session = scoped_session( sessionmaker(bind=engine, autocommit=False, autoflush=False) ) session = Session() return session, engine
[docs]def get_sql_url_params(sql_url): if not isinstance(sql_url, str): sql_url = str(sql_url) result = urlparse(sql_url) url_mapping = { "user": result.username, "password": result.password, "dbname": result.path[1:], "host": result.hostname, "port": result.port, } return url_mapping
[docs]def reflect_tables(sql_base, database_name): sql_uri = f"{sql_base}/{database_name}" engine = create_engine(sql_uri) meta = MetaData(engine) meta.reflect(views=True) tables = [table for table in meta.tables] engine.dispose() return tables
[docs]def ping_connection(session): is_database_working = True try: # to check database we will execute raw query session.execute("SELECT 1") except Exception as e: celery_logger.warn(e) is_database_working = False return is_database_working
[docs]class SqlAlchemyCache: def __init__(self): self._engines = {} self._sessions = {}
[docs] def get_engine(self, aligned_volume: str): if aligned_volume not in self._engines: SQL_URI_CONFIG = current_app.config["SQLALCHEMY_DATABASE_URI"] pool_size = current_app.config.get("DB_CONNECTION_POOL_SIZE", 5) max_overflow = current_app.config.get("DB_CONNECTION_MAX_OVERFLOW", 5) sql_base_uri = SQL_URI_CONFIG.rpartition("/")[0] sql_uri = make_url(f"{sql_base_uri}/{aligned_volume}") self._engines[aligned_volume] = create_engine( sql_uri, pool_recycle=3600, pool_size=pool_size, max_overflow=max_overflow, pool_pre_ping=True, ) return self._engines[aligned_volume]
[docs] def get(self, aligned_volume: str): if aligned_volume not in self._sessions: session = self._create_session(aligned_volume) session = self._sessions[aligned_volume] connection_ok = ping_connection(session) if not connection_ok: return self._create_session(aligned_volume) return session
def _create_session(self, aligned_volume: str): engine = self.get_engine(aligned_volume) Session = scoped_session(sessionmaker(bind=engine)) self._sessions[aligned_volume] = Session return self._sessions[aligned_volume]
[docs] def invalidate_cache(self): self._engines = {} self._sessions = {}
[docs]class DynamicMaterializationCache: def __init__(self): self._clients = {}
[docs] def get_db(self, database: str) -> DynamicAnnotationInterface: if database not in self._clients: db_client = self._get_mat_client(database) db_client = self._clients[database] connection_ok = ping_connection(db_client.database.cached_session) if not connection_ok: db_client = self._get_mat_client(database) return self._clients[database]
def _get_mat_client(self, database: str): sql_uri_config = get_config_param("SQLALCHEMY_DATABASE_URI") pool_size = current_app.config.get("DB_CONNECTION_POOL_SIZE", 5) max_overflow = current_app.config.get("DB_CONNECTION_MAX_OVERFLOW", 5) mat_client = DynamicAnnotationInterface( sql_uri_config, database, pool_size, max_overflow ) self._clients[database] = mat_client return self._clients[database]
[docs] def invalidate_cache(self): self._clients = {}
dynamic_annotation_cache = DynamicMaterializationCache() sqlalchemy_cache = SqlAlchemyCache()