April 12, 2026 8 min read
How to monitor cron jobs in Python
Python's scheduler libraries don't tell you when jobs fail silently. Here's how to add missed run detection, hung job alerts, and silent failure detection to any Python cron job — in under five minutes.
Python has no shortage of ways to schedule recurring tasks. APScheduler, schedule, Celery Beat, rq-scheduler, plain crontab entries running Python scripts — the ecosystem is fragmented, and every option has the same gap: none of them monitor your jobs.
Scheduling a job and monitoring a job are different problems. A scheduler fires your function at the right time. It does not tell you when the function fails silently, when the server reboots and the job never runs, or when the job has been hanging for six hours. For that, you need external monitoring that watches your jobs from outside your process.
This guide covers how to add production-grade monitoring to any Python cron job — whether you're using a scheduling library, a plain crontab, or a task queue.
The three failure modes Python schedulers don't catch
Missed runs
Your job didn't execute at all. A server reboot, a deployment that left the scheduler process stopped, or a crontab entry that was accidentally overwritten — any of these silently prevent your job from running. Without external monitoring you will not know until something downstream breaks.
Hung jobs
Your job started but hasn't finished. A database query waiting on a lock, an HTTP request with no timeout, a loop that never terminates — Python process schedulers have no concept of a maximum runtime. The job just keeps running, consuming memory and blocking other work, with no alert firing.
Silent failures
Your job completed with exit code 0 and no exceptions, but accomplished nothing useful. A sync that fetched zero records from an API that stopped returning data. A cleanup job whose filter matched nothing because of a recent schema change. An email batch that sent to an empty recipient list.
This is the hardest failure to catch. The scheduler sees a clean exit. Standard monitoring sees a heartbeat ping. Nobody sees the zero.
Option 1: Plain HTTP pings (works with any Python scheduler)
The simplest approach — and the one that works regardless of which scheduler you use — is plain HTTP pings. Your job sends a POST request when it starts and another when it finishes. An external monitoring service tracks whether pings arrive on schedule.
No SDK required. The requests library (or Python's built-in urllib) is enough:
import os
import requests
MONITOR_ID = "your-monitor-id"
API_KEY = os.environ["CRONTIFY_API_KEY"]
BASE_URL = f"https://api.crontify.com/api/v1/ping/{MONITOR_ID}"
HEADERS = {"X-API-Key": API_KEY}
def ping(event: str, payload: dict | None = None) -> None:
"""Send a monitoring ping. Never raises — a failed ping must not kill the job."""
try:
requests.post(
f"{BASE_URL}/{event}",
headers=HEADERS,
json=payload,
timeout=5,
)
except Exception:
pass # monitoring failure must never block the job
def run_nightly_sync() -> None:
ping("start")
try:
result = sync_records()
ping("success", {"meta": {"records_synced": result.count}})
except Exception as exc:
import traceback
ping("fail", {"message": str(exc), "log": traceback.format_exc()})
raise
The ping() helper wraps every request in a try/except with a short timeout. A Crontify outage or a network blip must never prevent your actual job from running or raising its own errors.
Option 2: APScheduler with monitoring
APScheduler is the most widely used Python scheduling library. It supports cron expressions, interval jobs, and one-off scheduled tasks, and it runs inside your application process.
The pattern is to wrap each job function with monitoring calls:
from apscheduler.schedulers.blocking import BlockingScheduler
import os
import requests
import traceback
scheduler = BlockingScheduler()
API_KEY = os.environ["CRONTIFY_API_KEY"]
BASE = "https://api.crontify.com/api/v1/ping"
HEADERS = {"X-API-Key": API_KEY}
def ping(monitor_id: str, event: str, payload: dict | None = None) -> None:
try:
requests.post(
f"{BASE}/{monitor_id}/{event}",
headers=HEADERS,
json=payload,
timeout=5,
)
except Exception:
pass
def monitored(monitor_id: str):
"""Decorator that wraps a job function with start/success/fail pings."""
def decorator(fn):
def wrapper(*args, **kwargs):
ping(monitor_id, "start")
try:
result = fn(*args, **kwargs)
meta = result if isinstance(result, dict) else {}
ping(monitor_id, "success", {"meta": meta} if meta else None)
except Exception as exc:
ping(monitor_id, "fail", {
"message": str(exc),
"log": traceback.format_exc(),
})
raise
return wrapper
return decorator
@monitored("mon_abc123")
def nightly_sync():
result = sync_records()
# Return a dict to attach as metadata — optional but recommended
return {"records_synced": result.count, "duration_ms": result.duration_ms}
scheduler.add_job(nightly_sync, "cron", hour=2, minute=0)
scheduler.start()
The @monitored decorator sends start, success, and fail pings automatically. Any dict returned from the job function is attached as metadata to the success ping and evaluated against alert rules in the dashboard.
Option 3: Celery Beat with monitoring
Celery Beat is the standard task scheduler for Celery-based applications. Each task is defined as a Celery task and scheduled in beat_schedule.
Monitoring works at the task level using Celery's before_task_publish, task_prerun, task_success, and task_failure signals — or more simply, by calling ping functions directly inside the task:
import os
import requests
import traceback
from celery import Celery
app = Celery("tasks", broker=os.environ["CELERY_BROKER_URL"])
app.conf.beat_schedule = {
"nightly-sync": {
"task": "tasks.nightly_sync",
"schedule": crontab(hour=2, minute=0),
},
}
API_KEY = os.environ["CRONTIFY_API_KEY"]
BASE = "https://api.crontify.com/api/v1/ping"
HEADERS = {"X-API-Key": API_KEY}
def ping(monitor_id: str, event: str, payload: dict | None = None) -> None:
try:
requests.post(
f"{BASE}/{monitor_id}/{event}",
headers=HEADERS,
json=payload,
timeout=5,
)
except Exception:
pass
@app.task
def nightly_sync():
monitor_id = "mon_abc123"
ping(monitor_id, "start")
try:
result = sync_records()
ping(monitor_id, "success", {
"meta": {
"records_synced": result.count,
"duration_ms": result.duration_ms,
}
})
except Exception as exc:
ping(monitor_id, "fail", {
"message": str(exc),
"log": traceback.format_exc(),
})
raise
One important note for Celery Beat specifically: the Beat scheduler process is separate from the Celery worker. If Beat goes down, jobs stop being scheduled — but the worker stays alive and appears healthy. External monitoring of the job execution (not just the worker process) is therefore especially important for Celery Beat setups.
Option 4: Plain crontab running a Python script
If you're running Python scripts directly from crontab, monitoring is straightforward with a shell wrapper:
# crontab entry
0 2 * * * /usr/bin/python3 /opt/jobs/nightly_sync.py
Add pings directly in your Python script:
#!/usr/bin/env python3
import os
import sys
import requests
import traceback
MONITOR_ID = "your-monitor-id"
API_KEY = os.environ.get("CRONTIFY_API_KEY", "")
BASE_URL = f"https://api.crontify.com/api/v1/ping/{MONITOR_ID}"
HEADERS = {"X-API-Key": API_KEY}
def ping(event: str, payload: dict | None = None) -> None:
try:
requests.post(
f"{BASE_URL}/{event}",
headers=HEADERS,
json=payload,
timeout=5,
)
except Exception:
pass
def main() -> None:
ping("start")
try:
result = sync_records()
ping("success", {
"meta": {
"records_synced": result.count,
"errors": result.errors,
}
})
except Exception as exc:
ping("fail", {
"message": str(exc),
"log": traceback.format_exc(),
})
sys.exit(1)
if __name__ == "__main__":
main()
One common pitfall with crontab and Python: cron runs with a minimal environment. Environment variables you've set in your shell profile — including CRONTIFY_API_KEY — are not available unless you explicitly export them in the crontab file or load them from a file in your script:
# crontab — explicitly set the environment variable
CRONTIFY_API_KEY=ck_live_your_key
0 2 * * * /usr/bin/python3 /opt/jobs/nightly_sync.py
Adding silent failure detection
All of the patterns above pass a meta dictionary to the success ping. This metadata is stored with the run record and evaluated against alert rules you define in the Crontify dashboard.
Common rules for Python data jobs:
records_synced eq 0 → alert (fetched but processed nothing)
records_fetched eq 0 → alert (upstream returned empty)
errors gt 0 → alert (partial failures during processing)
duration_ms gt 300000 → alert (took longer than 5 minutes)
The rules use four operators: eq (equals), ne (not equals), lt (less than), gt (greater than). You can stack multiple rules per monitor.
A sync job that fetched 10,000 records but processed zero — because a schema change broke the processing logic — will exit 0, send its success ping, and trigger a records_synced eq 0 alert. Without this rule, that failure is invisible.
A reusable monitoring context manager
For larger codebases with multiple jobs, a context manager keeps the monitoring logic centralised:
import os
import contextlib
import requests
import traceback
from typing import Generator
API_KEY = os.environ["CRONTIFY_API_KEY"]
BASE = "https://api.crontify.com/api/v1/ping"
HEADERS = {"X-API-Key": API_KEY}
def _ping(monitor_id: str, event: str, payload: dict | None = None) -> None:
try:
requests.post(
f"{BASE}/{monitor_id}/{event}",
headers=HEADERS,
json=payload,
timeout=5,
)
except Exception:
pass
@contextlib.contextmanager
def cron_monitor(monitor_id: str) -> Generator[dict, None, None]:
"""
Context manager that sends start/success/fail pings.
Yields a dict that the caller populates with metadata.
On clean exit, the metadata is sent with the success ping.
On exception, a fail ping is sent and the exception re-raised.
Usage:
with cron_monitor("mon_abc123") as meta:
result = sync_records()
meta["records_synced"] = result.count
"""
meta: dict = {}
_ping(monitor_id, "start")
try:
yield meta
_ping(monitor_id, "success", {"meta": meta} if meta else None)
except Exception as exc:
_ping(monitor_id, "fail", {
"message": str(exc),
"log": traceback.format_exc(),
})
raise
Usage:
with cron_monitor("mon_abc123") as meta:
result = sync_records()
meta["records_synced"] = result.count
meta["duration_ms"] = result.duration_ms
If sync_records() raises, the fail ping is sent automatically. If it completes, the success ping carries whatever you put in meta.
Frequently asked questions
Does monitoring add latency to my Python jobs?
Negligibly. Each ping is a single POST request with a 5-second timeout. In practice they complete in under 100ms. The timeout=5 parameter prevents a slow network from blocking your job, and the try/except ensures a monitoring failure never affects job execution.
What if I'm using schedule or another lightweight library?
The plain HTTP ping approach works with any scheduler. Wrap your job function with the ping() calls shown in Option 1, or use the context manager pattern. No library-specific integration is required.
Can I monitor the same job running on multiple servers?
Yes. Each instance sends pings to the same monitor ID. If two instances start simultaneously, Crontify records the overlap and fires an overlap alert — which is usually a signal worth investigating.
What's the right grace period for a Python job?
Set it to slightly more than your job's typical runtime. If your sync normally takes 8 minutes, a 15-minute grace period is reasonable. Crontify waits the grace period after the scheduled start time before declaring a missed run.
Start monitoring for free
Crontify is free for up to 5 monitors — no credit card required. HTTP pings work with any Python scheduler, framework, or runtime without installing any additional packages.
If you're running Python jobs in production and relying on a scheduler library to catch failures, you're watching the wrong signal.
Start monitoring your scheduled jobs
Free plan includes 5 monitors. No credit card required. Up and running in under 5 minutes.
Get started free →