Source code for materializationengine.celery_worker

import logging
import os
import sys
import redis

from celery.app.builtins import add_backend_cleanup_task
from celery.schedules import crontab
from celery.signals import after_setup_logger
from celery.utils.log import get_task_logger

from materializationengine.celery_init import celery
from materializationengine.celery_slack import post_to_slack_on_task_failure
from materializationengine.errors import TaskNotFound
from materializationengine.schemas import CeleryBeatSchema
from materializationengine.utils import get_config_param
from dateutil import relativedelta
import datetime

celery_logger = get_task_logger(__name__)


[docs]def create_celery(app=None): celery.conf.broker_url = app.config["CELERY_BROKER_URL"] celery.conf.result_backend = app.config["CELERY_RESULT_BACKEND"] if app.config.get("USE_SENTINEL", False): celery.conf.broker_transport_options = { "master_name": app.config["MASTER_NAME"] } celery.conf.result_backend_transport_options = { "master_name": app.config["MASTER_NAME"] } celery.conf.update( { "task_routes": ("materializationengine.task_router.TaskRouter"), "task_serializer": "json", "result_serializer": "json", "accept_content": ["json", "application/json"], "optimization": "fair", "task_send_sent_event": True, "task_track_started": True, "worker_send_task_events": True, "worker_prefetch_multiplier": 1, "result_expires": 86400, # results expire in broker after 1 day "redis_socket_connect_timeout": 10, "broker_transport_options": { "visibility_timeout": 8000, "socket_timeout": 20, "socket_connect_timeout": 20, }, # timeout (s) for tasks to be sent back to broker queue "beat_schedules": app.config["BEAT_SCHEDULES"], } ) celery.conf.update(app.config) TaskBase = celery.Task class ContextTask(TaskBase): abstract = True def __call__(self, *args, **kwargs): with app.app_context(): return TaskBase.__call__(self, *args, **kwargs) celery.Task = ContextTask if os.environ.get("SLACK_WEBHOOK"): celery.Task.on_failure = post_to_slack_on_task_failure return celery
[docs]@after_setup_logger.connect def celery_loggers(logger, *args, **kwargs): """ Display the Celery banner appears in the log output. https://www.distributedpython.com/2018/10/01/celery-docker-startup/ """ logger.info(f"Customize Celery logger, default handler: {logger.handlers[0]}") logger.addHandler(logging.StreamHandler(sys.stdout))
[docs]def days_till_next_month(date): """function to pick out the same weekday in the next month So if you pass the first wednesday of January, you get the first wednesday of February Args: date (datetime.datetime): a timepoint Returns: datetime.datetime: same day next month (in the sense of same # of weeday) """ weekday = relativedelta.weekday(date.isoweekday() - 1) weeknum = (date.day - 1) // 7 + 1 weeknum = weeknum if weeknum <= 4 else 4 next_date = date + relativedelta.relativedelta( months=1, day=1, weekday=weekday(weeknum) ) delta_days = next_date - date return delta_days.days
[docs]@celery.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): from materializationengine.workflows.periodic_database_removal import ( remove_expired_databases, ) from materializationengine.workflows.periodic_materialization import ( run_periodic_materialization, ) from materializationengine.workflows.update_database_workflow import ( run_periodic_database_update, ) merge_tables = get_config_param("MERGE_TABLES") periodic_tasks = { "run_daily_periodic_materialization": run_periodic_materialization.s( days_to_expire=2, merge_tables=merge_tables ), "run_weekly_periodic_materialization": run_periodic_materialization.s( days_to_expire=7, merge_tables=merge_tables ), "run_lts_periodic_materialization": run_periodic_materialization.s( days_to_expire=days_till_next_month( datetime.datetime.utcnow(), ), merge_tables=merge_tables, ), "run_periodic_database_update": run_periodic_database_update.s(), "remove_expired_databases": remove_expired_databases.s( delete_threshold=get_config_param("MIN_DATABASES") ), } # remove expired task results in redis broker sender.add_periodic_task( crontab(hour=0, minute=0, day_of_week="*", day_of_month="*", month_of_year="*"), add_backend_cleanup_task(celery), name="Clean up back end results", ) beat_schedules = celery.conf["beat_schedules"] celery_logger.info(beat_schedules) schedules = CeleryBeatSchema(many=True).dump(beat_schedules) for schedule in schedules: if schedule["task"] not in periodic_tasks: raise TaskNotFound(schedule["task"], periodic_tasks) task = periodic_tasks[schedule["task"]] sender.add_periodic_task( crontab( minute=schedule["minute"], hour=schedule["hour"], day_of_week=schedule["day_of_week"], day_of_month=schedule["day_of_month"], month_of_year=schedule["month_of_year"], ), task, name=schedule["name"], )
[docs]def get_celery_worker_status(): i = celery.control.inspect() availability = i.ping() stats = i.stats() registered_tasks = i.registered() active_tasks = i.active() scheduled_tasks = i.scheduled() result = { "availability": availability, "stats": stats, "registered_tasks": registered_tasks, "active_tasks": active_tasks, "scheduled_tasks": scheduled_tasks, } return result
[docs]def get_celery_queue_items(queue_name: str): with celery.connection_or_acquire() as conn: return conn.default_channel.queue_declare( queue=queue_name, passive=True ).message_count
[docs]def get_activate_tasks(): inspector = celery.control.inspect() return inspector.active()
[docs]def inspect_locked_tasks(release_locks: bool = False): client = redis.StrictRedis( host=get_config_param("REDIS_HOST"), port=get_config_param("REDIS_PORT"), password=get_config_param("REDIS_PASSWORD"), db=0, ) locked_tasks = list(client.scan_iter(match="LOCKED_WORKFLOW_TASK*")) lock_status_dict = {locked_task: {"locked": True} for locked_task in locked_tasks} if release_locks: for locked_task in lock_status_dict: client.delete(locked_task) lock_status_dict[locked_task] = {"locked": False} return lock_status_dict