Skip to content

Worker

sheaf.worker

Async job-queue worker — long-running inference decoupled from HTTP.

For jobs where request/response is the wrong shape (FLUX 50-step, GraphCast multi-day rollouts, batch SDXL): clients enqueue a typed request and either poll the result store or wait for a webhook.

Public surface::

from sheaf.worker import (
    SheafWorker,
    WorkerSpec,
    JobQueueClient,
    JobQueue,
    ResultStore,
    RedisStreamsQueue,
    RedisHashResultStore,
    Job,
    JobResult,
)

Install with: pip install 'sheaf-serve[worker]'

Job

Bases: BaseModel

A dequeued job — request + delivery metadata.

JobQueue

Bases: ABC

Abstract job queue. Subclass to add new backends (SQS, Kafka, ...).

enqueue abstractmethod

enqueue(request: dict, webhook_url: str | None = None, job_id: str | None = None) -> str

Add a job to the queue. Returns the job_id.

dequeue abstractmethod

dequeue(block_ms: int) -> Job | None

Block up to block_ms for the next job. Returns None on timeout.

ack abstractmethod

ack(job: Job) -> None

Confirm successful processing — removes the job from in-flight tracking.

nack abstractmethod

nack(job: Job) -> None

Mark the job as failed — leaves it for retry by the consumer group.

dead_letter abstractmethod

dead_letter(job: Job, reason: str) -> None

Move the job to the dead-letter queue with a failure reason.

queue_depth

queue_depth() -> int | None

Approximate pending+inflight count, or None if unsupported.

JobQueueClient

JobQueueClient(queue: JobQueue, results: ResultStore)

Application-side helper: enqueue jobs, optionally wait for results.

Wraps a JobQueue (for enqueue) and a ResultStore (for poll). Validates the request against AnyRequest before enqueue so callers see schema errors immediately, not in a worker log later.

enqueue

enqueue(request: dict | BaseModel, webhook_url: str | None = None, job_id: str | None = None) -> str

Validate + submit a request. Returns the assigned job_id.

wait_for_result

wait_for_result(job_id: str, timeout_s: float = 60.0, poll_interval_s: float = 0.25) -> JobResult

Poll the result store until the job completes or timeout_s elapses.

Raises TimeoutError if the deadline passes with no result.

JobResult

Bases: BaseModel

Result record written to the result store.

RedisHashResultStore

RedisHashResultStore(prefix: str = 'sheaf:result', url: str = 'redis://localhost:6379/0', ttl_seconds: int | None = 86400, client: Any = None)

Bases: ResultStore

Redis hash-per-job result store.

Each result is stored under the key f"{prefix}:{job_id}" as a Redis hash with fields status, response, error, completed_at. Optional TTL (ttl_seconds) lets results auto-expire so the store doesn't grow unbounded.

RedisStreamsQueue

RedisStreamsQueue(stream: str, group: str, consumer: str, url: str = 'redis://localhost:6379/0', dead_letter_stream: str | None = None, client: Any = None)

Bases: JobQueue

Redis Streams + consumer groups job queue.

Multiple workers using the same stream + group form a horizontal-scaling consumer pool: each job is delivered to exactly one consumer. Jobs that exceed max_retries (decided by the worker, not the queue) are XADDed to a separate dead-letter stream via :meth:dead_letter.

Notes
  • The stream and the consumer group are created lazily on first use (idempotent XGROUP CREATE ... MKSTREAM).
  • XACK is the only way a job leaves "pending" state for this consumer; the worker calls :meth:ack only after the result is persisted to the result store, so a crash between infer and ack causes redelivery (at-least-once).
  • redis-py is imported lazily — the [worker] extra adds redis>=5.0 but sheaf.worker.queue should be importable for type-only usage without it.

ResultStore

Bases: ABC

Abstract result store. Subclass to add new backends (S3, Postgres, ...).

SheafWorker

SheafWorker(spec: WorkerSpec, backend: ModelBackend | None = None)

Async-job consumer for any ModelBackend.

Example::

from sheaf.api.base import ModelType
from sheaf.worker import (
    SheafWorker, WorkerSpec, RedisStreamsQueue, RedisHashResultStore,
)

spec = WorkerSpec(
    name="flux-worker",
    model_type=ModelType.DIFFUSION,
    backend="flux",
    backend_kwargs={"model_id": "black-forest-labs/FLUX.1-schnell"},
    queue=RedisStreamsQueue(
        stream="sheaf:flux", group="workers", consumer="w1"
    ),
    results=RedisHashResultStore(prefix="sheaf:flux:result"),
    max_retries=3,
)
SheafWorker(spec).start()  # blocks until SIGINT

start

start() -> None

Run the consume loop until SIGINT/SIGTERM.

Blocks the calling thread. After receiving a stop signal, the worker finishes its current job (if any) before returning.

stop

stop() -> None

Request graceful shutdown. The current job, if any, finishes first.

run_one

run_one() -> Job | None

Process at most one job. Returns the Job processed, or None if idle.

Exposed publicly so tests can step the loop one iteration at a time without spinning a real signal handler.

WorkerSpec

Bases: BaseModel

Declares an async worker process consuming jobs from a queue.

Example::

from sheaf.worker import WorkerSpec, RedisStreamsQueue, RedisHashResultStore

spec = WorkerSpec(
    name="flux-worker",
    model_type=ModelType.DIFFUSION,
    backend="flux",
    backend_kwargs={"model_id": "black-forest-labs/FLUX.1-schnell"},
    queue=RedisStreamsQueue(
        stream="sheaf:flux", group="workers", consumer="w1"
    ),
    results=RedisHashResultStore(prefix="sheaf:flux:result"),
    max_retries=3,
)
SheafWorker(spec).start()

backend / backend_cls / backend_kwargs follow the same semantics as on ModelSpec: backend_cls takes precedence over the registry lookup when set.