rq-rediscron - Redis-backed RQ cron scheduling

rq-rediscron provides Redis-backed subclasses for RQ cron scheduling:

  • RedisCronJob

  • RedisCronScheduler

Cron job definitions are stored in Redis hashes and indexed in a sorted set by next enqueue time. This lets scheduler processes pick up job creates, edits, and deletes at runtime without restart.

Only one RedisCronScheduler process runs at a time. The scheduler acquires rq:cron_jobs:lock during register_birth(), extends that lock on heartbeat, and releases it during register_death(). A second scheduler can start after the lock TTL expires, which provides failover without duplicate enqueue loops.

Usage

from redis import Redis
from rediscron import RedisCronScheduler


def rebuild_metric(metric_id: str) -> None:
    ...


connection = Redis.from_url("redis://localhost:6379/0")
scheduler = RedisCronScheduler(connection)
scheduler.register(
    rebuild_metric,
    queue_name="metrics",
    id="metric:pm25",
    args=("pm25",),
    interval=300,
)
scheduler.start()

Redis keys

Keys used by this package:

  • rq:cron_job:{id} stores one cron job hash.

  • rq:cron_jobs indexes job ids by next enqueue timestamp.

  • rq:cron_jobs:lock allows one live scheduler process at a time.

  • rq:cron_jobs:last_update records runtime changes.

  • rq:cron_jobs:events publishes create, update, and delete events.

Code

class rediscron.core.RedisCronJob(id: str, queue_name: str, func: Callable | None = None, func_name: str | None = None, args: tuple | None = None, kwargs: dict | None = None, interval: int | None = None, cron: str | None = None, job_timeout: int | None = None, result_ttl: int = 500, ttl: int | None = None, failure_ttl: int | None = None, meta: dict | None = None, connection: Redis | None = None, latest_enqueue_time: datetime | None = None, next_enqueue_time: datetime | None = None, created_at: datetime | None = None, updated_at: datetime | None = None, enabled: bool = True)

A Redis-persisted RQ cron job with a stable application-owned id.

RedisCronJob is the persisted form of one recurring job. The job is stored in a Redis hash at rq:cron_job:{id} and indexed in rq:cron_jobs by its next enqueue timestamp.

Create or update jobs through RedisCronScheduler.register() for normal use. Use this class directly when you are building administration tools that need to fetch, refresh, or delete one job.

Example:

from redis import Redis
from rediscron import RedisCronJob

def rebuild_metric(metric_id: str) -> None:

redis = Redis.from_url("redis://localhost:6379/0")
job = RedisCronJob(
    id="metric:pm25",
    queue_name="metrics",
    func=rebuild_metric,
    args=("pm25",),
    interval=300,
    connection=redis,
)
job.save(event="created")

same_job = RedisCronJob.fetch("metric:pm25", redis)
same_job.delete()
property key: str

Redis hash key for this cron job.

Example:

job = RedisCronJob.fetch("metric:pm25", redis)
assert job.key == "rq:cron_job:metric:pm25"
to_dict() dict[str, Any]

Serialize the cron job into Redis hash fields.

Values are intentionally primitive strings, integers, or JSON strings so the hash can be inspected with redis-cli hgetall rq:cron_job:{id}. args, kwargs, and meta are JSON encoded; datetimes are stored in RQ’s UTC datetime format.

save(pipeline: Any | None = None, event: str = 'updated') None

Persist this cron job and notify schedulers about the change.

save() writes the hash, updates the sorted-set index, bumps rq:cron_jobs:last_update, and publishes an event on rq:cron_jobs:events.

Example:

job = RedisCronJob.fetch("metric:pm25", redis)
job.interval = 600
job.save(event="updated")
enable(pipeline: Any | None = None) None

Enable this cron job and notify running schedulers.

disable(pipeline: Any | None = None) None

Disable this cron job and notify running schedulers.

refresh() None

Reload this job from Redis in place.

Example:

job = RedisCronJob.fetch("metric:pm25", redis)
# Another process edits the job.
job.refresh()
delete(pipeline: Any | None = None) None

Delete this cron job from Redis and notify running schedulers.

Deletes remove both the job hash and its sorted-set index entry. Running schedulers notice the change through Pub/Sub when available and through the update marker as the correctness fallback.

Example:

RedisCronJob.fetch("metric:pm25", redis).delete()
enqueue(connection: Redis)

Enqueue one concrete RQ job for this cron definition.

Restored Redis cron jobs are executable because restore() imports the stored dotted func_name back into a Python callable before enqueueing.

classmethod fetch(id: str, connection: Redis) RedisCronJob

Fetch one persisted cron job by id.

Example:

job = RedisCronJob.fetch("metric:pm25", redis)
print(job.next_enqueue_time)
classmethod restore(raw_data: dict[Any, Any], connection: Redis | None = None) RedisCronJob

Restore a RedisCronJob from Redis hash data.

Unlike RQ’s monitoring-only CronJob.from_dict(), this method imports func_name and returns a cron job that can be enqueued immediately.

set_enqueue_time(time: datetime) None

Record an enqueue timestamp and persist the recalculated next run.

class rediscron.core.RedisCronScheduler(connection: Redis, logging_level: str | int = 20, name: str = '', lock_ttl: int = 120, max_sleep_interval: float = 60.0)

Cron scheduler that uses Redis as the cron job source of truth.

The scheduler keeps RQ’s scheduler registry and heartbeat behavior, but it does not use RQ’s in-memory _cron_jobs list to decide what should run. Each loop reads due job ids from the Redis sorted set and fetches the corresponding job hash before enqueueing.

Only one RedisCronScheduler process is allowed to be alive at a time. The scheduler acquires rq:cron_jobs:lock during register_birth() and releases it during register_death(). Heartbeats extend the lock TTL while the scheduler is healthy. If another scheduler already owns the lock, startup fails before the scheduler registers itself in RQ’s registry.

Example:

from redis import Redis
from rediscron import RedisCronScheduler

def rebuild_metric(metric_id: str) -> None:

redis = Redis.from_url("redis://localhost:6379/0")
scheduler = RedisCronScheduler(redis, lock_ttl=120)
scheduler.register(
    rebuild_metric,
    queue_name="metrics",
    id="metric:pm25",
    args=("pm25",),
    interval=300,
)
scheduler.start()
register(func: Callable, queue_name: str, id: str, args: tuple | None = None, kwargs: dict | None = None, interval: int | None = None, cron: str | None = None, job_timeout: int | None = None, result_ttl: int = 500, ttl: int | None = None, failure_ttl: int | None = None, meta: dict | None = None, enabled: bool | None = None) RedisCronJob

Create or update one persisted cron job.

id is required and stable. Calling register() again with the same id edits the existing job instead of adding a duplicate. Runtime edits are persisted immediately and published to running schedulers.

Example:

scheduler.register(rebuild_metric, "metrics", id="metric:pm25", interval=300)

# Later, from another process:
scheduler.register(rebuild_metric, "metrics", id="metric:pm25", interval=600)
get_jobs() list[RedisCronJob]

Return all enabled jobs currently indexed in Redis.

Missing hashes are pruned from the sorted set as stale index entries. This makes Redis the source of truth while keeping the index tidy.

get_all_jobs() list[RedisCronJob]

Return all persisted cron jobs, including disabled jobs.

enqueue_jobs() list[RedisCronJob]

Enqueue all Redis cron jobs currently due.

This method assumes the scheduler has already acquired the singleton scheduler lock during register_birth(). If the lock is missing or was lost, no jobs are enqueued.

calculate_sleep_interval() float

Return seconds until the next indexed cron job is due.

The value is capped by max_sleep_interval so runtime updates are still observed even if no Pub/Sub message is received.

save_jobs_data() None

No-op compatibility hook.

RQ’s CronScheduler persists the in-memory _cron_jobs list into the scheduler hash. RedisCronScheduler stores each job independently, so there is no scheduler-local job list to save.

start()

Start the singleton Redis cron scheduler loop.

Startup acquires the scheduler lock and registers birth. Shutdown unregisters from RQ’s registry and releases the lock only if this scheduler still owns it.

register_birth() None

Acquire the singleton scheduler lock and register this scheduler.

If the lock is already owned, this method raises RuntimeError before registering the scheduler with RQ. This prevents two scheduler processes from running at the same time and enqueueing duplicate work.

register_death(pipeline: Any | None = None) None

Unregister this scheduler and release the singleton lock.

heartbeat() None

Heartbeat RQ’s scheduler registry and extend the scheduler lock.

If the scheduler no longer owns the lock, StopRequested is raised so the process exits instead of continuing alongside another scheduler.

CLI

To see what scheduled jobs are currently registered (enabled or disabled), just type rqrcron info:

$ rqrcron info
visits  */1 * * * *  enabled   2026-06-03T05:40:00.000000Z  analytics
vacuum  every 10s    enabled   2026-06-03T05:39:35.962178Z  maintenance
2 scheduled jobs total

Updated: 2026-06-03 12:39:35.320659

To see the scheduled jobs grouped by queue, just use the -R (or --by-queue) flag:

$ rqrcron info -R
analytics:
  visits  */1 * * * *  enabled   2026-06-03T05:41:00.000000Z

maintenance:
  vacuum  every 10s  enabled   2026-06-03T05:40:16.790579Z

2 queues, 2 scheduled jobs total

Updated: 2026-06-03 12:40:14.200368

To watch scheduled jobs, you can specify a poll interval using the -i (or --interval) flag:

$ rqrcron info -i 1

Why?

The original cron implementation in RQ is holding all the cron jobs locally as a list inside the instance of the scheduler. Because of this design choice, as soon as the scheduler is started, the list of scheduled jobs cannot be easily modified.

In some cases, we want to be able to add/edit/delete scheduled jobs on the fly and for the scheduler to acknowledge those events accordingly. This is what lead the design choices for this package.

Inspired by redisbeat, a celery scheduler using redis to store metadata. See Hello RedBeat: A New Celery Beat Scheduler for mor info.