Back to list
OpenAEC-Foundation

erpnext-syntax-scheduler

by OpenAEC-Foundation

28 deterministic Claude AI skills for flawless ERPNext/Frappe v14-16 development. Agent Skills standard compliant.

1🍴 1📅 Jan 23, 2026

SKILL.md


name: erpnext-syntax-scheduler description: Scheduler and background jobs syntax for Frappe/ERPNext v14/v15/v16. Use for scheduler_events in hooks.py, frappe.enqueue() for async jobs, queue configuration, job deduplication, error handling, and monitoring. Triggers on questions about scheduled tasks, background processing, cron jobs, RQ workers, job queues, async tasks.

ERPNext Syntax: Scheduler & Background Jobs

Deterministic syntax reference for Frappe scheduler events and background job processing.

Quick Reference

Scheduler Events (hooks.py)

# hooks.py
scheduler_events = {
    "all": ["myapp.tasks.every_tick"],
    "hourly": ["myapp.tasks.hourly_task"],
    "daily": ["myapp.tasks.daily_task"],
    "weekly": ["myapp.tasks.weekly_task"],
    "monthly": ["myapp.tasks.monthly_task"],
    "daily_long": ["myapp.tasks.heavy_daily"],  # Long queue
    "cron": {
        "0 9 * * 1-5": ["myapp.tasks.weekday_9am"],
        "*/15 * * * *": ["myapp.tasks.every_15_min"]
    }
}

CRITICAL: After EVERY change to scheduler_events: bench migrate

frappe.enqueue Basics

# Simple
frappe.enqueue("myapp.tasks.process", customer="CUST-001")

# With queue and timeout
frappe.enqueue(
    "myapp.tasks.heavy_task",
    queue="long",
    timeout=3600,
    param="value"
)

# With deduplication (v15)
from frappe.utils.background_jobs import is_job_enqueued

job_id = f"import::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue("myapp.tasks.import_data", job_id=job_id, doc=doc.name)

Scheduler Event Types

EventFrequencyQueue
allEvery tick (v14: 4min, v15: 60s)default
hourlyPer hourdefault
dailyPer daydefault
weeklyPer weekdefault
monthlyPer monthdefault
hourly_longPer hourlong
daily_longPer daylong
weekly_longPer weeklong
monthly_longPer monthlong
cronCustom scheduleconfigurable

Version difference scheduler tick:

  • v14: ~240 seconds (4 min)
  • v15: ~60 seconds

Queue Types

QueueTimeoutUsage
short300s (5 min)Quick tasks, UI responses
default300s (5 min)Standard tasks
long1500s (25 min)Heavy processing, imports

frappe.enqueue Parameters

frappe.enqueue(
    method,                      # REQUIRED: function or module path
    queue="default",             # Queue name
    timeout=None,                # Override timeout (seconds)
    is_async=True,               # False = execute directly
    now=False,                   # True = via frappe.call()
    job_id=None,                 # v15: unique ID for deduplication
    enqueue_after_commit=False,  # Wait for DB commit
    at_front=False,              # Place at front of queue
    on_success=None,             # Success callback
    on_failure=None,             # Failure callback
    **kwargs                     # Arguments for method
)

Job Deduplication

from frappe.utils.background_jobs import is_job_enqueued

job_id = f"process::{doc.name}"
if not is_job_enqueued(job_id):
    frappe.enqueue(
        "myapp.tasks.process",
        job_id=job_id,
        doc_name=doc.name
    )

v14 (Deprecated)

# DO NOT USE - only for legacy code
from frappe.core.page.background_jobs.background_jobs import get_info
enqueued = [d.get("job_name") for d in get_info()]
if name not in enqueued:
    frappe.enqueue(..., job_name=name)

Error Handling Pattern

def process_records(records):
    for record in records:
        try:
            process_single(record)
            frappe.db.commit()  # Commit per success
        except Exception:
            frappe.db.rollback()  # Rollback on error
            frappe.log_error(
                frappe.get_traceback(),
                f"Process Error: {record}"
            )

Callbacks

def on_success_handler(job, connection, result, *args, **kwargs):
    frappe.publish_realtime("show_alert", {"message": "Done!"})

def on_failure_handler(job, connection, type, value, traceback):
    frappe.log_error(f"Job {job.id} failed: {value}")

frappe.enqueue(
    "myapp.tasks.risky_task",
    on_success=on_success_handler,
    on_failure=on_failure_handler
)

User Context

IMPORTANT: Scheduler jobs run as Administrator!

def scheduled_task():
    # frappe.session.user = "Administrator"
    
    # Set explicit owner:
    doc = frappe.new_doc("ToDo")
    doc.owner = "user@example.com"
    doc.insert(ignore_permissions=True)

Monitoring

ToolDescription
RQ Worker (DocType)Worker status, busy/idle
RQ Job (DocType)Job status, queue filter
bench doctorScheduler status overview
Scheduled Job LogExecution history

Version Differences v14 vs v15

Featurev14v15
Tick interval4 min60 sec
Config keyscheduler_intervalscheduler_tick_interval
Deduplicationjob_namejob_id + is_job_enqueued()

Reference Files

Critical Rules

  1. ALWAYS bench migrate after hooks.py scheduler_events changes
  2. USE job_id + is_job_enqueued() for deduplication (v15)
  3. CHOOSE correct queue: short/default/long based on duration
  4. COMMIT per successful record, rollback on error
  5. REMEMBER that jobs run as Administrator
  6. ENQUEUE heavy tasks from scheduler events, don't execute directly

Score

Total Score

75/100

Based on repository quality metrics

SKILL.md

SKILL.mdファイルが含まれている

+20
LICENSE

ライセンスが設定されている

+10
説明文

100文字以上の説明がある

+10
人気

GitHub Stars 100以上

0/15
最近の活動

1ヶ月以内に更新

+10
フォーク

10回以上フォークされている

0/5
Issue管理

オープンIssueが50未満

+5
言語

プログラミング言語が設定されている

+5
タグ

1つ以上のタグが設定されている

+5

Reviews

💬

Reviews coming soon