Source code for materializationengine.schemas

from dynamicannotationdb.models import AnalysisTable, AnalysisVersion, VersionErrorTable
from flask_marshmallow import Marshmallow
from marshmallow import fields, ValidationError, Schema
from marshmallow_sqlalchemy import SQLAlchemyAutoSchema

ma = Marshmallow()


[docs]class AnalysisVersionSchema(SQLAlchemyAutoSchema):
[docs] class Meta: model = AnalysisVersion load_instance = True
[docs]class AnalysisTableSchema(SQLAlchemyAutoSchema):
[docs] class Meta: model = AnalysisTable load_instance = True
[docs]class VersionErrorTableSchema(SQLAlchemyAutoSchema):
[docs] class Meta: model = VersionErrorTable load_instance = True
[docs]class CronField(fields.Field): def _deserialize(self, value, attr, data, **kwargs): if isinstance(value, (str, int, list)): return value else: raise ValidationError("Field should be str, int or list")
[docs]class CeleryBeatSchema(Schema): name = fields.Str(required=True) minute = CronField(default="*") hour = CronField(default="*") day_of_week = CronField(default="*") day_of_month = CronField(default="*") month_of_year = CronField(default="*") task = fields.Str(required=True)