AI Toolkit
Modulesworkflow

workflow

Durable background jobs with retry, cron, human-in-the-loop, and AI steps

Overview

The workflow module wraps Inngest for durable AI workflows with automatic retries, cron scheduling, human-in-the-loop approval, and integrated AI steps with cost tracking.

Peer dependencies: inngest

npm install inngest
yarn add inngest
pnpm add inngest

Quick Start

import { createWorkflow, defineJob } from '@jamaalbuilds/ai-toolkit/workflow';

const workflow = await createWorkflow({ id: 'my-app' });

const processJob = defineJob(workflow, {
  id: 'process-data',
  trigger: { event: 'app/data.uploaded' },
}, async ({ event, step }) => {
  const result = await step.run('transform', async () => {
    return transformData(event.data);
  });
  return { status: 'complete', data: result };
});

API Reference

createWorkflow(config)

Create a workflow client backed by Inngest.

async function createWorkflow(config: WorkflowConfig): Promise<WorkflowClient>
ParameterTypeDescription
config.idstringApplication identifier

defineJob(workflow, config, handler)

Define a durable background job.

function defineJob(
  workflow: WorkflowClient,
  config: JobConfig,
  handler: (ctx: JobContext) => Promise<unknown>,
): WorkflowJob
ParameterTypeDescription
config.idstringUnique job identifier
config.triggerTriggerEvent trigger or cron schedule
// Event-triggered job
const job = defineJob(workflow, {
  id: 'process-upload',
  trigger: { event: 'app/file.uploaded' },
}, async ({ event, step }) => {
  // Durable steps with automatic retry
  const parsed = await step.run('parse', () => parseFile(event.data.url));
  const analyzed = await step.run('analyze', () => analyze(parsed));
  return analyzed;
});

// Cron job
const dailyJob = defineJob(workflow, {
  id: 'daily-report',
  trigger: { cron: '0 9 * * *' },
}, async ({ step }) => {
  return await step.run('generate-report', () => buildReport());
});

humanInTheLoop(step, options)

Pause a workflow and wait for human approval.

function humanInTheLoop(
  step: WorkflowStep,
  options: HITLOptions,
): Promise<unknown | null>
ParameterTypeDescription
options.stepIdstringStep identifier
options.eventstringEvent name to wait for
options.timeoutstringTimeout duration (e.g., '7d', '1h')
const approval = await humanInTheLoop(step, {
  stepId: 'wait-approval',
  event: 'app/data.approved',
  timeout: '7d',
});

if (!approval) return { status: 'timed_out' };

aiStep(step, options)

Execute an AI generation step within a workflow with automatic cost tracking.

function aiStep(
  step: WorkflowStep,
  options: AIStepOptions,
): Promise<AIStepResult>
ParameterTypeDescription
options.stepIdstringStep identifier
options.promptstringAI prompt
options.fallbackstringFallback text if AI fails
const summary = await aiStep(step, {
  stepId: 'summarize',
  prompt: `Summarize: ${JSON.stringify(data)}`,
  fallback: 'Summary unavailable',
});
console.log(summary.text);

serve(options)

Create an HTTP handler for serving workflow functions.

function serve(options: {
  client: WorkflowClient;
  functions: WorkflowJob[];
}): Promise<{ GET; POST; PUT }>

Types

  • WorkflowClient — client with defineJob, serve
  • WorkflowJob — defined job function
  • JobContext — event, step
  • WorkflowStep — durable step with run(), sleep(), waitForEvent()
  • AIStepOptions — stepId, prompt, fallback
  • HITLOptions — stepId, event, timeout
  • Trigger — event trigger or cron schedule
On this page

On this page