DAGGO is a Go-native workflow orchestrator for teams that want to define jobs in Go, ship a single binary, and still get an admin UI, scheduling, run history, and step-level diagnostics out of the box.
The embedded UI is intended to be usable immediately in an imported app: overview timeline, job graph, queues, and run history in one place.
The job detail view shows the DAG shape, an Enabled scheduling toggle, and tabbed Overview/Runs panels with inline run launch.
The runs view gives a run-centric history with filtering and drill-in diagnostics.
- Typed DAG authoring with build-time validation.
- First-class queues with loader-driven ingestion and queue-item visibility.
- Sqlite by default with Postgres also supported.
- Embedded web admin UI served by the same Go process.
- RPC API with generated client support.
- Background scheduling and async run execution.
- Internal worker bootstrapping handled by DAGGO, so importing apps do not need to implement private subprocess commands.
- Robust worker pool implementation and coordination from the DAGGO admin.
- Support for data partitions, assets, and management.
- Migration support for coming from Dagster and similar tools.
- Reference integrations with BigQuery, Databricks, and similar platforms.
go get github.com/swetjen/daggopackage main
import (
"context"
"log"
"github.com/swetjen/daggo"
"myapp/jobs"
"myapp/ops"
"myapp/resources"
)
func main() {
cfg := daggo.DefaultConfig()
cfg.Admin.Port = "8080"
cfg.Database.SQLite.Path = "/tmp/daggo.sqlite"
deps := resources.NewDeps()
myOps := ops.NewMyOps(deps)
myJobs := jobs.ContentIngestionJob(myOps)
if err := daggo.Run(context.Background(), cfg, myJobs); err != nil {
log.Fatal(err)
}
}jobs.ContentIngestionJob(...) mounts named ops onto the DAG definition:
func ContentIngestionJob(myOps *ops.MyOps) dag.JobDefinition {
scrapePage := dag.Op[dag.NoInput, ops.ScrapePageOutput]("scrape_page", myOps.ScrapePageOp)
extractTitle := dag.Op[ops.ExtractTitleInput, ops.ExtractTitleOutput]("extract_title", myOps.ExtractTitleOp)
extractEntities := dag.Op[ops.ExtractEntitiesInput, ops.ExtractEntitiesOutput]("extract_entities", myOps.ExtractEntitiesOp)
extractLinks := dag.Op[ops.ExtractLinksInput, ops.ExtractLinksOutput]("extract_links", myOps.ExtractLinksOp)
upsertIndex := dag.Op[ops.UpsertIndexInput, ops.UpsertIndexOutput]("upsert_index", myOps.UpsertIndexOp)
return dag.NewJob("content_ingestion").
Add(scrapePage, extractTitle, extractEntities, extractLinks, upsertIndex).
AddSchedule(dag.ScheduleDefinition{
CronExpr: "*/15 * * * *",
Timezone: "UTC",
Enabled: true,
}).
MustBuild()
}Each op follows a standard Go function shape:
- Input type
- Output type
- Named Go function or method
type AnalyzeTextInput struct {
Page ScrapePageOutput
}
type AnalyzeTextOutput struct {
VowelsCount int
ConsonantsCount int
}
func (o *MyOps) AnalyzeTextOp(ctx context.Context, in AnalyzeTextInput) (AnalyzeTextOutput, error) {
if err := ctx.Err(); err != nil {
return AnalyzeTextOutput{}, err
}
body := in.Page.Body
// Naively strip HTML tags.
// Good enough for demo purposes, not production-grade parsing.
tagRegex := regexp.MustCompile(`<[^>]*>`)
text := tagRegex.ReplaceAllString(body, "")
vowels := 0
consonants := 0
for _, r := range strings.ToLower(text) {
if r >= 'a' && r <= 'z' {
switch r {
case 'a', 'e', 'i', 'o', 'u':
vowels++
default:
consonants++
}
}
}
return AnalyzeTextOutput{
VowelsCount: vowels,
ConsonantsCount: consonants,
}, nil
}Queues let DAGGO orchestrate work pulled from user-owned storage or ingress. A queue definition attaches one or more jobs, a loader, and a partition resolver. Routes are optional, but when you expose one it should be implemented with Virtuous and push work into your own queue store.
type ImportEnvelope struct {
CustomerID string `json:"customer_id"`
BatchID string `json:"batch_id"`
}
func main() {
cfg := daggo.DefaultConfig()
cfg.Admin.Port = "8080"
cfg.Database.SQLite.Path = "/tmp/daggo.sqlite"
importJob := jobs.CustomerImportJob(myOps)
auditJob := jobs.CustomerImportAuditJob(myOps)
importQueue := daggo.NewQueue[ImportEnvelope]("customer_imports").
WithRoute("/queues/customer-imports", myVirtuousHandler).
WithLoader(loadQueuedImports, daggo.QueueLoaderOptions{
Mode: daggo.QueueLoadModePoll,
PollEvery: 2 * time.Second,
}).
WithPartitionKey(func(item ImportEnvelope) (string, error) {
return item.CustomerID, nil
}).
AddJobs(importJob, auditJob).
MustBuild()
if err := daggo.RunDefinitions(context.Background(), cfg, importQueue); err != nil {
log.Fatal(err)
}
}Queue-attached jobs are unordered fan-out peers. DAGGO creates one run per attached job for each accepted queue item, rolls queue-item status up from those linked runs, and shows queue items plus partitions as top-level UI views.
Successful step outputs may publish queue item metadata by implementing daggo.MetadataProvider, and step code can read queue context with daggo.QueueMetaFromContext(ctx) or daggo.QueuePayloadFromContext[T](ctx).
See examples/content_ingestion/main.go, examples/content_ingestion/jobs/content_ingestion.go, examples/content_ingestion/ops/content_ops.go, and examples/content_ingestion/resources/deps.go for the job-centric example. For a minimal queue runtime example, see examples/queue_ingestion/main.go.
Calling daggo.Run(...) gives new users a working runtime immediately:
- Opens the configured database.
- Applies bundled migrations automatically.
- Syncs registered jobs into DAGGO metadata tables.
- Serves the embedded admin UI and RPC docs.
- Starts the scheduler and async executor.
- Handles DAGGO's internal worker subprocess command inside the same application binary.
Current schedules are derived from the jobs you register at startup. DAGGO does not persist future schedule definitions in the database; it persists scheduler bookkeeping and historical runs.
If you start with daggo.RunDefinitions(...) or daggo.OpenDefinitions(...), DAGGO also syncs queue definitions, mounts queue routes, and starts queue loaders alongside the existing job runtime.
If you only want the RPC surface, set cfg.DisableUI = true. DAGGO will continue serving /rpc/ and /rpc/docs/, while / will no longer expose the admin UI.
By default, DAGGO executes runs in subprocess mode. When a run starts, DAGGO launches a separate worker PID from the same application binary and that worker owns the run lifecycle.
Relevant execution settings:
cfg.Execution.Modecfg.Execution.MaxConcurrentRunscfg.Execution.MaxConcurrentSteps
Because the active run lives in a separate worker process, the web server can restart or roll forward independently without tying run execution to a request-serving goroutine. DAGGO is also designed for deploy-drain coordination so new web code can come up without immediately breaking active workers. We plan to add additional daemon and runner configurations later.
If you want to mount DAGGO into a larger server instead of letting it own the listener, use daggo.Open(...) and attach app.Handler() wherever you need it.
For importing apps, the cleanest layout is usually:
myapp/
main.go
jobs/
content_ingestion.go
customer_sync.go
ops/
content_ops.go
customer_ops.go
resources/
deps.go
crud.go
openai.go
playwright.go
gemini.go
s3.go
- Put job definitions in
jobs/. - Put concrete operational code in
ops/. - Put shared clients and app dependencies in
resources/. - Pass
resources.Depsinto an ops struct and bind DAGGO steps to named methods on that struct.
That keeps job wiring declarative while keeping dependency handling explicit and testable.
package resources
import (
"context"
"github.com/openai/openai-go/v3"
"myapp/db"
"github.com/swetjen/daggo/resources/ollama"
playwrightresource "github.com/swetjen/daggo/resources/playwright"
"github.com/swetjen/daggo/resources/s3resource"
"google.golang.org/genai"
)
type Deps struct {
CRUD *db.Queries
Playwright *playwrightresource.RemoteResource
S3 *s3resource.Resource
Gemini *genai.Client
OpenAI *openai.Client
Ollama *ollama.Resource
Scraper func(ctx context.Context, targetURL string) (ScrapeResult, error)
}Then mount those dependencies onto an ops struct:
package ops
type MyOps struct {
deps resources.Deps
}
func NewMyOps(deps resources.Deps) *MyOps {
return &MyOps{deps: deps}
}
func (o *MyOps) ScrapePageOp(ctx context.Context, input dag.NoInput) (ScrapePageOutput, error) {
if err := ctx.Err(); err != nil {
return ScrapePageOutput{}, err
}
// Run some processing on the input.
result, err := o.deps.Scraper(ctx, o.scrapeTargetURL(input))
if err != nil {
return ScrapePageOutput{}, err
}
return ScrapePageOutput{
Body: result.Body,
StatusCode: result.StatusCode,
}, nil
}
func (o *MyOps) scrapeTargetURL(dag.NoInput) string {
return "https://example.com"
}
type ScrapePageOutput struct {
Body string
StatusCode int
}Then your jobs/ package can focus on graph composition:
func ContentIngestionJob(myOps *ops.MyOps) dag.JobDefinition {
scrape := dag.Op[dag.NoInput, ops.ScrapePageOutput]("scrape_page", myOps.ScrapePageOp)
return dag.NewJob("content_ingestion").Add(scrape).MustBuild()
}Start from daggo.DefaultConfig() and override what you need:
cfg := daggo.DefaultConfig()
cfg.Admin.Port = "8080"
cfg.Admin.SecretKey = "replace-me"
cfg.DisableUI = false
cfg.Database.SQLite.Path = "/tmp/daggo.sqlite"
cfg.Retention.RunDays = 30Available config areas:
cfg.Admin.Port: web admin / RPC listen port.cfg.Admin.SecretKey: optional bearer secret for/rpc/and/rpc/docs/.cfg.DisableUI: disable the embedded admin UI while keeping RPC/docs enabled.cfg.Database: database driver and connection settings.cfg.Execution: queue size, execution mode, run concurrency, step concurrency.cfg.Scheduler: scheduler enablement and tick controls.cfg.Deploy: deploy-drain lock settings.cfg.Retention: automatic run-history retention and purge settings.
Environment-based startup is still available through daggo.LoadConfigFromEnv() for the repo's sample binary and local development.
SQLite is the default and is fully implemented today.
cfg := daggo.DefaultConfig()
cfg.Database.SQLite.Path = "/tmp/daggo.sqlite"PostgreSQL is opt-in and only activates when the driver is explicitly set to postgres.
cfg := daggo.DefaultConfig()
cfg.Database.Driver = daggo.DatabaseDriverPostgres
cfg.Database.Postgres.Host = "db.internal"
cfg.Database.Postgres.Port = 5432
cfg.Database.Postgres.User = "daggo"
cfg.Database.Postgres.Password = "secret"
cfg.Database.Postgres.Database = "platform"
cfg.Database.Postgres.Schema = "my_project"
cfg.Database.Postgres.SSLMode = "require"At startup, DAGGO will:
- connect to the configured PostgreSQL database
- create the configured schema if it does not exist
- set
search_pathso DAGGO uses that schema - run embedded up-migrations automatically
- use PostgreSQL for jobs, runs, scheduler state, and events
The PostgreSQL runtime details and remaining limitations are documented in docs/POSTGRES_RUNTIME_SPEC.md.
If you want to lock down the DAGGO control plane, configure a secret key:
cfg := daggo.DefaultConfig()
cfg.Admin.SecretKey = "replace-me"
cfg.DisableUI = trueWith a secret configured, DAGGO requires Authorization: Bearer <secret> on /rpc/ and /rpc/docs/.
Generated clients pass the same value through their auth option:
const client = createClient("http://localhost:8000")
await client.jobs.JobsGetMany({ limit: 50, offset: 0 }, { auth: "replace-me" })The embedded UI is not authenticated yet. For locked-down deployments today, run DAGGO with cfg.DisableUI = true.
Schedules are declarative job config, not persisted DB records.
- Current schedules come from the in-memory jobs you register at startup.
- DAGGO persists only scheduler state and dedupe claims for active schedules.
- Removing a schedule from code clears that scheduler bookkeeping but preserves historical runs.
dag.ScheduleDefinition.Keyis optional; if omitted, DAGGO derives a stable readable key from the cron expression.
PostgreSQL is not auto-detected from env vars alone. Set the driver explicitly:
export DAGGO_DATABASE_DRIVER=postgres
export DAGGO_POSTGRES_HOST=db.internal
export DAGGO_POSTGRES_PORT=5432
export DAGGO_POSTGRES_USER=daggo
export DAGGO_POSTGRES_PASSWORD=secret
export DAGGO_POSTGRES_DATABASE=platform
export DAGGO_POSTGRES_SCHEMA=my_project
export DAGGO_POSTGRES_SSLMODE=require
go run ./cmd/api- Install prerequisites: Go
1.25+,bun,make. - Copy the env template:
cp .env.example .env- Generate SQL, SDK, and frontend assets:
make gen-all- Start the sample binary:
go run ./cmd/api- Open:
- UI:
http://localhost:8000/ - RPC docs:
http://localhost:8000/rpc/docs/ - OpenAPI:
http://localhost:8000/rpc/openapi.json
make gen: regenerate sqlc output.make gen-sdk: regenerate RPC clients.make gen-web: rebuild frontend assets.make gen-all: run every generation step.go test ./...: run the Go test suite.
- Usage guide: docs/USAGE_GUIDE.md
- Coming from Dagster: docs/COMING_FROM_DAGSTER.md
- PostgreSQL runtime plan: docs/POSTGRES_RUNTIME_SPEC.md
- Deploy-drain behavior: docs/WILL_DEPLOY_DRAIN_LOCK_PLAN.md



