CelerySQLScheduler

class beatdrop.schedulers.celery_sql_scheduler.CelerySQLScheduler

Bases: SQLScheduler, CeleryScheduler

Hold schedule entries in an SQL database, and send tasks to Celery queues.

Uses an SQL database to store schedule entries and scheduler state. It is safe to run multiple CelerySQLScheduler s simultaneously, as well as have many that are purely used as clients to read/write entries.

NOTE - You must also install the DB driver specified in the URL for create_engine_kwargs.

NOTE - Before running the scheduler for the first time the DB tables must be created using create_tables().

Parameters:
  • max_interval (datetime.timedelta) – The maximum interval that the scheduler should sleep before waking up to check for due tasks.

  • sched_entry_types (Tuple[Type[ScheduleEntry]], default : (CrontabEntry, CrontabTZEntry, EventEntry, IntervalEntry)) – A list of valid schedule entry types for this scheduler. These are only stored in the scheduler, not externally.

  • default_sched_entries (List[ScheduleEntry], default : []) – Default list of schedule entries. In general these entries are not held in non-volatile storage so any metadata they hold will be lost if the scheduler fails. These entries are static. The keys cannot be overwritten or deleted.

  • lock_timeout (datetime.timedelta) – The time a scheduler does not refresh the scheduler lock before it is considered dead. Should be at least 3 times the max_interval.

  • create_engine_kwargs (dict) – Keyword arguments to pass to sqlalchemy.create_engine. See SQLAlchemy docs for more info. https://docs.sqlalchemy.org/en/14/core/engines.html#sqlalchemy.create_engine

  • celery_app (celery.Celery) – Celery app for sending tasks.

Example

from celery import Celery
from beatdrop import CelerySQLScheduler

celery_app = Celery()
sched = CelerySQLScheduler(
    max_interval=60,
    celery_app=celery_app,
    lock_timeout=180,
    create_engine_kwargs={
        "url": "sqlite:///my_sqlite.db"
    }
)
# use the scheduler as a client
entry_list = sched.list()
for entry in entry_list:
    print(entry.key)

# or run it
sched.run()
create_tables() None

Create DB tables for the schedule entries.

delete(sched_entry: ScheduleEntry) None

Delete a schedule entry from the scheduler.

This does not delete default entries.

Parameters:

sched_entry (ScheduleEntry) – Scheduler entry to delete from the scheduler.

get(key: str) ScheduleEntry

Retrieve a schedule entry by its key.

Parameters:

key (str) – The schedule entry key.

Returns:

The schedule entry with the matching key.

Return type:

ScheduleEntry

Raises:

beatdrop.exceptions.ScheduleEntryNotFound – The schedule entry could not be found.

list(page_size: int = 500) SQLScheduleEntryList

List schedule entries.

Parameters:

page_size (int, optional) – DB page size, by default 500

Returns:

Iterator of all schedule entries. Automatically paginated DB results.

Return type:

SQLScheduleEntryList

run(max_iterations: int = None) None

Run the scheduler.

Parameters:

max_iterations (int) –

default : None

The maximum number of iterations to run the scheduler. None is unlimited.

save(sched_entry: ScheduleEntry, read_only_attributes: bool = False) None

Save a new, or update an existing schedule entry in the DB.

If read_only_attributes is set to False, sched_entry’s read only attributes will be set to what’s in the DB.

Parameters:
  • sched_entry (ScheduleEntry) – Schedule entry to create or update.

  • read_only_attributes (bool, optional) – If true, read only attributes are also saved to the DB. Clients should almost always leave this false, by default False

send(sched_entry: ScheduleEntry) None

Send a schedule entry to the Celery queue.

Parameters:

sched_entry (ScheduleEntry) – Schedule entry to send to the Celery queue.