diff --git a/CLAUDE.md b/CLAUDE.md index f792a0b8..bd34c6d2 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -6,15 +6,22 @@ This file provides guidance to agentic coding tools when working with code in th - Repository root is `atproto-worker` not `packages/pds/` - Use `pwd` or check `process.cwd()` to confirm location -- Many project files (CLAUDE.md, EDGE_PDS_PLAN.md) are at repository root +- Many project files (CLAUDE.md, plans/) are at repository root - Package-specific files are in `packages/pds/` -**ALWAYS read and update the implementation plan:** - -- **Read** `EDGE_PDS_PLAN.md` at the repository root before starting work to understand project status -- **Update** `EDGE_PDS_PLAN.md` when you complete phases or discover important implementation details -- The plan tracks what's completed, what's pending, and critical technical notes -- Keep the "Completed" section updated with new learnings (WebSocket patterns, CBOR encoding, etc.) +**ALWAYS read and update implementation plans:** + +- Plans are organized in the `plans/` directory at repository root: + - `plans/complete/` - Completed features with full documentation + - `plans/in-progress/` - Active development work + - `plans/todo/` - Planned future features and improvements +- **Read** relevant plan documents before starting work to understand project status and prior decisions +- **Update** plan documents when you complete features, discover important implementation details, or change priorities +- Key plan documents: + - `plans/complete/core-pds.md` - Core PDS implementation (all completed features) + - `plans/todo/endpoint-implementation.md` - Endpoint implementation status and priorities + - `plans/todo/oauth-provider.md` - OAuth 2.1 implementation plan + - `plans/todo/migration-wizard.md` - Account migration UX specification ## Repository Structure diff --git a/EDGE_PDS_PLAN.md b/EDGE_PDS_PLAN.md deleted file mode 100644 index f3266e0d..00000000 --- a/EDGE_PDS_PLAN.md +++ /dev/null @@ -1,2245 +0,0 @@ -# Edge PDS Implementation Plan - -## Goal - -Build a single-user AT Protocol Personal Data Server (PDS) on Cloudflare Workers with Durable Objects. The PDS will federate with the Bluesky network – the relay can sync from it, and AppViews can read from it. - -**Scope:** Single-user only. No account creation, no multi-tenancy. The owner's DID and signing key are configured at deploy time. - ---- - -## Current Status - -**Live at: https://pds.mk.gg** - -### Completed (Phase 1-8) - -- ✅ **Storage Layer** (Phase 1) - `SqliteRepoStorage` implementing `@atproto/repo` RepoStorage interface -- ✅ **Durable Object** (Phase 2) - `AccountDurableObject` with Repo integration - - SQLite schema initialization - - Lazy loading of Repo from storage - - Signing key import from environment with validation - - Create new repo if none exists, load existing repo otherwise - - RPC-first architecture following DO best practices -- ✅ **XRPC Endpoints** (Phase 3) - Full router implementation with Hono - - Tier 1: Sync endpoints (`com.atproto.sync.getRepo`, `com.atproto.sync.getRepoStatus`, `com.atproto.sync.subscribeRepos`) - - Tier 2: Repository operations (`com.atproto.repo.{describeRepo,getRecord,listRecords,createRecord,deleteRecord}`) - - Tier 3: Server identity (`com.atproto.server.describeServer`, `com.atproto.identity.resolveHandle`) -- ✅ **Firehose** (Phase 4) - WebSocket subscribeRepos event stream - - Sequencer class for commit event log management - - WebSocket hibernation API handlers (message, close, error) - - DAG-CBOR frame encoding (header + body concatenation using `@atproto/lex-cbor`) - - Event broadcasting to connected clients - - Cursor-based backfill and validation - - SQLite `firehose_events` table for event persistence - - **WebSocket/RPC Integration**: Uses `stub.fetch()` instead of RPC methods to avoid WebSocket serialization errors (RPC cannot serialize WebSocket objects per workerd limitation) - - **Frame Decoding**: Uses `decodeAll()` from `@atproto/lex-cbor` to handle concatenated CBOR objects - - **CORS Handling**: Standard CORS middleware works with all Hono routes; WebSocket endpoint bypasses CORS naturally -- ✅ **DID Document** (Phase 6) - Served at `/.well-known/did.json` for did:web resolution -- ✅ **Authentication** (Phase 7) - Bearer token middleware for write endpoints -- ✅ **Health Check** - `/health` endpoint with version info -- ✅ **Deployment** - Custom domain `pds.mk.gg` with auto-provisioned DNS -- ✅ **Signing Keys** - secp256k1 keypair generated and configured -- ✅ **Environment Validation** - Module-scope validation using `cloudflare:workers` env import -- ✅ **Blob Storage** (Phase 5) - R2 integration for image/media uploads - - `BlobStore` class using `cidForRawBytes()` from `@atproto/lex-cbor` - - `com.atproto.repo.uploadBlob` endpoint (authenticated, 5MB limit) - - `com.atproto.sync.getBlob` endpoint (public read access) - - Direct R2 access in endpoint (R2ObjectBody cannot be serialized across RPC) - - Blobs stored with DID prefix for isolation -- ✅ **Testing** - Migrated to vitest 4, all 140 tests passing - - 16 storage tests - - 32 XRPC tests (auth, concurrency, error handling, CAR validation) - - 8 firehose tests (event sequencing, cursor validation, backfill) - - 10 blob tests (upload, retrieval, size limits, content types) - - 15 session tests (login, refresh, getSession, JWT validation) - - 8 validation tests (optimistic mode, strict mode, schema enforcement) - - 9 migration tests (account status, import/export, validation) - - 11 Bluesky validation tests (post creation, profile updates, schema compliance) - - 3 service-auth tests - - 28 CLI tests (19 dotenv, 9 wrangler config utilities) -- ✅ **TypeScript** - All diagnostic errors resolved, proper type declarations for cloudflare:test -- ✅ **Protocol Helpers** - All protocol operations use official @atproto utilities - - Record keys: `TID.nextStr()` from `@atproto/common-web` - - AT URI construction: `AtUri.make()` from `@atproto/syntax` - - DID validation: `ensureValidDid()` from `@atproto/syntax` - - Handle validation: `ensureValidHandle()` from `@atproto/syntax` - - CBOR encoding: `@atproto/lex-cbor` - - Blob CID generation: `cidForRawBytes()` from `@atproto/lex-cbor` - - CAR export: `blocksToCarFile()` from `@atproto/repo` -- ✅ **Dependency Optimization** - Removed 6 low-level dependencies, added 3 @atproto helpers - - Removed: `varint`, `@types/varint`, `cborg`, `uint8arrays`, `@ipld/dag-cbor`, `multiformats` - - Added: `@atproto/lex-data`, `@atproto/lex-cbor`, `@atproto/common-web` - - Net reduction: 116 lines, better standards compliance -- ✅ **Session Authentication** (Phase 8) - JWT-based login for Bluesky app compatibility - - `com.atproto.server.createSession` - login with identifier + password - - `com.atproto.server.refreshSession` - token rotation with refresh JWT - - `com.atproto.server.getSession` - get current session info - - `com.atproto.server.deleteSession` - logout (stateless, client-side) - - HS256 JWT signing with `jose` library (matches reference implementation) - - bcrypt password hashing with `bcryptjs` - - Auth middleware accepts both static `AUTH_TOKEN` and JWT access tokens - - 15 new tests for session endpoints -- ✅ **Lexicon Validation** - Record schema validation for mutation endpoints - - `RecordValidator` class using `@atproto/lexicon` package - - Optimistic validation strategy (fail-open): validates if schema is loaded, allows unknown collections - - Integrated into `createRecord`, `putRecord`, and `applyWrites` endpoints - - Schemas can be added dynamically via `validator.addSchema()` - - 8 validation tests covering optimistic mode, strict mode, and schema enforcement -- ✅ **Account Migration** (Phase 9) - Import/export for PDS migration - - `com.atproto.repo.importRepo` - Import repository from CAR file (authenticated, 100MB limit) - - `com.atproto.server.getAccountStatus` - Get account status for migration planning - - CAR file import using `readCarWithRoot()` from `@atproto/repo` - - Validates DID matches during import to prevent incorrect migrations - - Prevents importing over existing repository data - - Complete export/import workflow tested with CAR file validation - - 9 comprehensive migration tests -- ✅ **CLI Setup Wizard** - Interactive PDS configuration - - `pds init` - Full interactive setup wizard (hostname, handle, DID, password, keys) - - `pds secret jwt` - Generate and set JWT signing secret - - `pds secret password` - Set account password (bcrypt hash) - - `pds secret key` - Generate secp256k1 signing keypair - - Uses wrangler's `experimental_patchConfig` for vars, `wrangler secret put` for secrets - - `--local` flag writes to `.dev.vars` instead for local development - - Built with Citty + @clack/prompts for interactive CLI experience - - 28 CLI tests (19 dotenv, 9 wrangler config) - -### Not Started - -- None! All planned phases are complete. - -### Future Work: did:plc Migration Support - -Account migration is now possible from bsky.social. To support users migrating their existing `did:plc` accounts to this PDS, we need to implement the full migration flow. - -**Migration Flow (4 phases):** - -1. **Account Creation** - New PDS creates account in "deactivated" state with existing DID -2. **Data Migration** - Export repo as CAR, import to new PDS, migrate blobs -3. **Identity Update** - Old PDS signs PLC operation, new PDS submits to plc.directory -4. **Finalization** - Activate on new PDS, deactivate on old - -**Endpoints needed for incoming migration:** - -| Endpoint | Status | Purpose | -| --------------------------------------------------- | ------ | ------------------------------------- | -| `com.atproto.server.createAccount` | ❌ | Create account with existing DID | -| `com.atproto.identity.getRecommendedDidCredentials` | ❌ | Provide DID params for PLC operation | -| `com.atproto.identity.submitPlcOperation` | ❌ | Submit signed PLC op to plc.directory | -| `com.atproto.server.activateAccount` | ❌ | Activate migrated account | -| `com.atproto.repo.listMissingBlobs` | ❌ | Check which blobs need migration | - -**Endpoints needed for outgoing migration:** - -| Endpoint | Status | Purpose | -| --------------------------------------------------- | ------ | ----------------------------------------- | -| `com.atproto.server.getServiceAuth` | ❌ | Generate service auth token for migration | -| `com.atproto.identity.requestPlcOperationSignature` | ❌ | Request email verification token | -| `com.atproto.identity.signPlcOperation` | ❌ | Sign PLC operation with rotation key | -| `com.atproto.server.deactivateAccount` | ❌ | Deactivate account after migration | - -**Already implemented:** - -- ✅ `com.atproto.sync.getRepo` - Export repository as CAR -- ✅ `com.atproto.repo.importRepo` - Import repository from CAR -- ✅ `com.atproto.sync.listBlobs` - List blobs for migration -- ✅ `com.atproto.server.getAccountStatus` - Check account status - -**References:** - -- [Account Migration - AT Protocol](https://atproto.com/guides/account-migration) -- [Account Migration Details Discussion](https://github.com/bluesky-social/atproto/discussions/3176) -- [Enabling Account Migration Back to Bluesky's PDS](https://docs.bsky.app/blog/incoming-migration) - -### Testing & Development Notes - -**Vitest 4 Migration**: Successfully migrated to vitest 4 with `@cloudflare/vitest-pool-workers` PR build (#11632). This required: - -- Updating config format from `defineWorkersConfig` to `defineConfig` with `cloudflareTest` plugin -- Moving pool options to top-level test config -- Setting `maxWorkers: 1` and `isolate: false` for Durable Object testing - -**CJS/ESM Module Resolution**: Fixed module shimming issues with `@atproto/*` packages by adding `resolve: { conditions: ["node", "require"] }` to vitest config. This forces Vite to use the actual CJS builds provided by `multiformats` instead of attempting to shim ESM builds. - -**BlockMap/CidSet Iteration**: Access internal Map/Set properties directly when iterating: - -```typescript -// Instead of: for (const [cid, bytes] of blocks) { ... } -const internalMap = (blocks as unknown as { map: Map }).map -for (const [cidStr, bytes] of internalMap) { ... } -``` - -**TypeScript Module Resolution**: Fixed module resolution for packages with broken exports (`multiformats`, `@ipld/dag-cbor`, `uint8arrays`) by: - -- Adding `moduleResolution: "bundler"` to tsconfig.json -- Creating `src/types/modules.d.ts` with custom type declarations for problematic packages -- Using `verbatimModuleSyntax` compatible imports (named imports instead of namespace imports) -- Adding `@cloudflare/vitest-pool-workers/types` to test tsconfig for cloudflare:test module - -**Durable Object RPC Types**: Using `Rpc.Serializable` for RPC method return types to ensure TypeScript correctly infers serializable types instead of `never`. - -**R2 Blob Retrieval**: The `getBlob` endpoint accesses R2 directly rather than going through DO RPC because `R2ObjectBody` cannot be serialized (contains ReadableStream). Upload operations still use DO RPC since they only need to pass Uint8Array and return serializable metadata. - ---- - -## Architecture - -``` -┌─────────────────────────────────────────────────────────────────┐ -│ Cloudflare Edge │ -│ │ -│ ┌──────────────┐ ┌─────────────────────────────────┐ │ -│ │ Worker │────────▶│ Account Durable Object │ │ -│ │ (stateless) │ │ (single instance) │ │ -│ │ │ │ │ │ -│ │ • Routing │ │ • Repository (via @atproto/repo)│ │ -│ │ • Auth │ │ • SQLite storage │ │ -│ │ • DID doc │ │ • Firehose (WebSocket) │ │ -│ └──────────────┘ └─────────────────────────────────┘ │ -│ │ │ │ -│ ▼ ▼ │ -│ ┌──────────────┐ ┌─────────────────────────────────┐ │ -│ │ R2 │ │ DO SQLite │ │ -│ │ (blobs) │ │ (blocks, records, commits) │ │ -│ └──────────────┘ └─────────────────────────────────┘ │ -└─────────────────────────────────────────────────────────────────┘ -``` - ---- - -## Build vs Buy Analysis - -### Components We Will USE (Buy/Reuse) - -| Component | Package | Rationale | -| ------------------------ | --------------------- | -------------------------------------------------------------------------- | -| MST & Repo Operations | `@atproto/repo` | Core protocol logic, well-tested, handles commits, MST updates, CAR export | -| Cryptographic Operations | `@atproto/crypto` | Signing, verification, did:key - critical to get right | -| Protocol Utilities | `@atproto/syntax` | TID generation, AT-URI parsing, DID/handle validation | -| Schema Validation | `@atproto/lexicon` | Record type validation | -| CBOR Encoding | `@atproto/lex-cbor` | Official AT Protocol CBOR utilities, Workers-compatible | -| CID Operations | `@atproto/lex-data` | Stable CID interface wrapping multiformats | -| Common Utilities | `@atproto/common-web` | TID generation, timestamp utilities | - -### Components We Will BUILD - -| Component | Rationale | -| ---------------------- | ----------------------------------------------------------------- | -| Storage Adapter | Must implement `RepoStorage` interface for DO SQLite - ~100 lines | -| XRPC Router | Lightweight routing layer - can use Hono or custom (~200 lines) | -| Firehose Event Emitter | WebSocket hibernation is Workers-specific - must build | -| Sequence Manager | Simple counter + event buffer in SQLite (~50 lines) | -| Auth Middleware | Simple bearer token check for MVP (~30 lines) | -| Blob Handler | R2 integration is Workers-specific (~50 lines) | - -### Components We Will DEFER - -| Component | Reason | -| ----------------- | --------------------------------------- | -| OAuth Provider | Complex, not needed for single-user MVP | -| Rate Limiting | Single user, not needed for MVP | -| did:plc Migration | Complex - see "Future Work" section | -| Labelling | AppView concern, not PDS | - ---- - -## Dependencies - -All verified to work on Cloudflare Workers with `nodejs_compat`: - -```json -{ - "dependencies": { - "@atproto/common-web": "^0.4.7", - "@atproto/crypto": "^0.4.5", - "@atproto/lex-cbor": "^0.0.3", - "@atproto/lex-data": "^0.0.3", - "@atproto/lexicon": "^0.6.0", - "@atproto/repo": "^0.8.12", - "@atproto/syntax": "^0.4.2", - "@clack/prompts": "^0.11.0", - "bcryptjs": "^3.0.3", - "citty": "^0.1.6", - "hono": "^4.11.3", - "jose": "^6.1.3" - }, - "devDependencies": { - "@arethetypeswrong/cli": "^0.18.2", - "@cloudflare/vite-plugin": "^1.17.0", - "@cloudflare/vitest-pool-workers": "https://pkg.pr.new/@cloudflare/vitest-pool-workers@11632", - "@cloudflare/workers-types": "^4.20251225.0", - "publint": "^0.3.16", - "tsx": "^4.21.0", - "typescript": "^5.9.3", - "vite": "^6.0.0", - "vitest": "^4.0.0", - "wrangler": "^4.54.0" - } -} -``` - -**Key Changes from Original Plan:** - -- Using official `@atproto/lex-cbor` instead of direct `@ipld/dag-cbor` and `cborg` -- Using `@atproto/lex-data` for CID operations instead of direct `multiformats` -- Added `@atproto/common-web` for TID utilities -- Removed low-level encoding libraries - all handled by @atproto packages -- Using vitest 4 via PR build for Durable Object testing support - -### Compatibility Notes - -- **`nodejs_compat`** flag required in wrangler.toml -- **Compatibility date**: `2024-09-23` or later -- **Memory limit**: 128MB - use streaming for large CAR files -- **CPU time**: No limit in Durable Objects (use DO for heavy operations) - ---- - -## Implementation Phases - -### Phase 1: Storage Layer - -**Goal:** Implement the `RepoStorage` interface that `@atproto/repo` needs. - -#### Interface to Implement - -Based on research of `@atproto/repo`, implement this interface: - -```typescript -interface RepoStorage { - // Read operations - getBytes(cid: CID): Promise; - has(cid: CID): Promise; - getBlocks(cids: CID[]): Promise<{ blocks: BlockMap; missing: CID[] }>; - - // Write operations - putBlock(cid: CID, bytes: Uint8Array, rev: string): Promise; - putMany(blocks: BlockMap, rev: string): Promise; - - // Root management - getRoot(): Promise; - updateRoot(cid: CID, rev: string): Promise; - - // Atomic commit - applyCommit(commit: CommitData): Promise; -} -``` - -#### SQLite Schema - -```sql --- Block storage (MST nodes + record blocks) -CREATE TABLE blocks ( - cid TEXT PRIMARY KEY, - bytes BLOB NOT NULL, - rev TEXT NOT NULL -); - -CREATE INDEX idx_blocks_rev ON blocks(rev); - --- Repo state (single row) -CREATE TABLE repo_state ( - id INTEGER PRIMARY KEY CHECK (id = 1), - root_cid TEXT NOT NULL, - rev TEXT NOT NULL, - seq INTEGER NOT NULL DEFAULT 0 -); - --- Initialize with empty state -INSERT INTO repo_state (id, root_cid, rev, seq) VALUES (1, '', '', 0); -``` - -#### Implementation Pattern - -```typescript -export class SqliteRepoStorage implements RepoStorage { - constructor(private sql: SqlStorage) {} - - async getBytes(cid: CID): Promise { - const row = this.sql - .exec("SELECT bytes FROM blocks WHERE cid = ?", cid.toString()) - .one(); - return row ? new Uint8Array(row.bytes) : null; - } - - async putMany(blocks: BlockMap, rev: string): Promise { - const stmt = this.sql.prepare( - "INSERT OR REPLACE INTO blocks (cid, bytes, rev) VALUES (?, ?, ?)", - ); - for (const [cid, bytes] of blocks.entries()) { - stmt.bind(cid.toString(), bytes, rev).run(); - } - } - - async applyCommit(commit: CommitData): Promise { - // Transaction: add new blocks, remove old, update root - this.sql.exec("BEGIN TRANSACTION"); - try { - // Add new blocks - await this.putMany(commit.newBlocks, commit.rev); - - // Remove old blocks - for (const cid of commit.removedCids) { - this.sql.exec("DELETE FROM blocks WHERE cid = ?", cid.toString()); - } - - // Update root - this.sql.exec( - "UPDATE repo_state SET root_cid = ?, rev = ? WHERE id = 1", - commit.cid.toString(), - commit.rev, - ); - - this.sql.exec("COMMIT"); - } catch (e) { - this.sql.exec("ROLLBACK"); - throw e; - } - } -} -``` - -#### Testing Strategy - -```typescript -// test/storage.test.ts -import { describe, it, expect } from "vitest"; -import { env, runInDurableObject } from "cloudflare:test"; -import { Repo } from "@atproto/repo"; -import { Secp256k1Keypair } from "@atproto/crypto"; - -describe("SqliteRepoStorage", () => { - it("stores and retrieves blocks", async () => { - const id = env.ACCOUNT.newUniqueId(); - const stub = env.ACCOUNT.get(id); - - await runInDurableObject(stub, async (instance, state) => { - const storage = new SqliteRepoStorage(state.storage.sql); - - const cid = CID.parse("bafyreib..."); - const bytes = new Uint8Array([1, 2, 3]); - - await storage.putBlock(cid, bytes, "rev1"); - const retrieved = await storage.getBytes(cid); - - expect(retrieved).toEqual(bytes); - }); - }); - - it("works with @atproto/repo Repo class", async () => { - const id = env.ACCOUNT.newUniqueId(); - const stub = env.ACCOUNT.get(id); - - await runInDurableObject(stub, async (instance, state) => { - const storage = new SqliteRepoStorage(state.storage.sql); - const keypair = await Secp256k1Keypair.create(); - - // Create a new repo - const repo = await Repo.create(storage, "did:web:example.com", keypair); - - expect(repo.cid).toBeDefined(); - expect(await storage.getRoot()).toEqual(repo.cid); - }); - }); - - it("applies commits atomically", async () => { - // Test that failed commits roll back - }); -}); -``` - ---- - -### Phase 2: Durable Object Skeleton - -**Goal:** Set up the Account DO with SQLite and basic lifecycle. - -#### Wrangler Configuration - -```toml -# wrangler.toml -name = "atproto-pds" -main = "src/index.ts" -compatibility_date = "2024-09-23" -compatibility_flags = ["nodejs_compat"] - -[[durable_objects.bindings]] -name = "ACCOUNT" -class_name = "AccountDurableObject" - -[[migrations]] -tag = "v1" -new_sqlite_classes = ["AccountDurableObject"] - -[[r2_buckets]] -binding = "BLOBS" -bucket_name = "pds-blobs" - -[vars] -# Non-secret config -PDS_HOSTNAME = "pds.example.com" - -# Secrets (set via wrangler secret put) -# DID = "did:web:example.com" -# SIGNING_KEY = "..." -# AUTH_TOKEN = "..." -``` - -#### Durable Object Implementation - -```typescript -// src/account-do.ts -import { DurableObject } from "cloudflare:workers"; -import { Repo } from "@atproto/repo"; -import { Secp256k1Keypair } from "@atproto/crypto"; -import { SqliteRepoStorage } from "./storage"; - -export class AccountDurableObject extends DurableObject { - private repo: Repo | null = null; - private storage: SqliteRepoStorage | null = null; - private keypair: Secp256k1Keypair | null = null; - - constructor(ctx: DurableObjectState, env: Env) { - super(ctx, env); - - // Initialize schema before processing requests - ctx.blockConcurrencyWhile(async () => { - await this.initialize(); - }); - } - - private async initialize() { - // Run migrations - this.ctx.storage.sql.exec(` - CREATE TABLE IF NOT EXISTS blocks ( - cid TEXT PRIMARY KEY, - bytes BLOB NOT NULL, - rev TEXT NOT NULL - ); - CREATE INDEX IF NOT EXISTS idx_blocks_rev ON blocks(rev); - - CREATE TABLE IF NOT EXISTS repo_state ( - id INTEGER PRIMARY KEY CHECK (id = 1), - root_cid TEXT, - rev TEXT, - seq INTEGER NOT NULL DEFAULT 0 - ); - - INSERT OR IGNORE INTO repo_state (id, root_cid, rev, seq) - VALUES (1, NULL, NULL, 0); - `); - - this.storage = new SqliteRepoStorage(this.ctx.storage.sql); - - // Load keypair from env - this.keypair = await Secp256k1Keypair.import(this.env.SIGNING_KEY); - - // Load or create repo - const root = await this.storage.getRoot(); - if (root) { - this.repo = await Repo.load(this.storage, root); - } else { - this.repo = await Repo.create(this.storage, this.env.DID, this.keypair); - } - } - - // Expose repo operations via RPC - async getRecord(collection: string, rkey: string) { - return this.repo!.getRecord(collection, rkey); - } - - async createRecord(collection: string, rkey: string, record: unknown) { - const write = { - action: WriteOpAction.Create, - collection, - rkey, - record, - }; - - const commit = await this.repo!.applyWrites([write], this.keypair!); - await this.storage!.applyCommit(commit); - - // Emit firehose event - await this.emitCommitEvent(commit); - - return { - uri: `at://${this.env.DID}/${collection}/${rkey}`, - cid: commit.cid, - }; - } - - // ... other repo operations -} -``` - -#### Testing Strategy - -```typescript -// test/account-do.test.ts -import { describe, it, expect } from "vitest"; -import { env, runInDurableObject } from "cloudflare:test"; -import { AccountDurableObject } from "../src/account-do"; - -describe("AccountDurableObject", () => { - it("initializes with empty repo on first access", async () => { - const id = env.ACCOUNT.idFromName("test-account"); - const stub = env.ACCOUNT.get(id); - - await runInDurableObject(stub, async (instance: AccountDurableObject) => { - expect(instance.repo).toBeDefined(); - expect(instance.repo.did).toBe("did:web:test.example.com"); - }); - }); - - it("persists repo state across restarts", async () => { - const id = env.ACCOUNT.idFromName("test-account"); - - // First access - create record - let stub = env.ACCOUNT.get(id); - const result = await stub.createRecord("app.bsky.feed.post", "abc123", { - text: "Hello world", - createdAt: new Date().toISOString(), - }); - - // Simulate restart by getting new stub - stub = env.ACCOUNT.get(id); - - // Verify record persisted - const record = await stub.getRecord("app.bsky.feed.post", "abc123"); - expect(record.text).toBe("Hello world"); - }); - - it("uses fixed ID for single-user routing", async () => { - // Always route to "account" ID - const id = env.ACCOUNT.idFromName("account"); - expect(id.toString()).toBeDefined(); - }); -}); -``` - ---- - -### Phase 3: Core XRPC Endpoints - -**Goal:** Implement the minimum endpoints for federation. - -#### XRPC Router Setup - -Using Hono for lightweight routing: - -```typescript -// src/xrpc.ts -import { Hono } from "hono"; - -export function createXrpcRouter(env: Env) { - const app = new Hono(); - - // Get the single account DO - const getAccount = () => { - const id = env.ACCOUNT.idFromName("account"); - return env.ACCOUNT.get(id); - }; - - // Error handler - app.onError((err, c) => { - console.error(err); - return c.json( - { - error: err.name || "InternalServerError", - message: err.message, - }, - err.status || 500, - ); - }); - - // XRPC endpoints - return app; -} -``` - -#### Tier 1: Sync Endpoints (Required for Federation) - -```typescript -// GET /xrpc/com.atproto.sync.getRepo -app.get("/xrpc/com.atproto.sync.getRepo", async (c) => { - const did = c.req.query("did"); - if (did !== env.DID) { - return c.json({ error: "RepoNotFound", message: "Unknown DID" }, 404); - } - - const account = getAccount(); - const carBytes = await account.exportRepo(); - - return new Response(carBytes, { - headers: { "Content-Type": "application/vnd.ipld.car" }, - }); -}); - -// GET /xrpc/com.atproto.sync.getRepoStatus -app.get("/xrpc/com.atproto.sync.getRepoStatus", async (c) => { - const did = c.req.query("did"); - if (did !== env.DID) { - return c.json({ error: "RepoNotFound" }, 404); - } - - const account = getAccount(); - const status = await account.getRepoStatus(); - - return c.json({ - did: env.DID, - active: true, - rev: status.rev, - status: "active", - }); -}); - -// WS /xrpc/com.atproto.sync.subscribeRepos -// Handled separately via WebSocket upgrade -``` - -#### Tier 2: Repo Endpoints - -```typescript -// GET /xrpc/com.atproto.repo.describeRepo -app.get("/xrpc/com.atproto.repo.describeRepo", async (c) => { - const did = c.req.query("repo"); - if (did !== env.DID) { - return c.json({ error: "RepoNotFound" }, 404); - } - - return c.json({ - handle: env.HANDLE, - did: env.DID, - didDoc: await getDidDocument(env), - collections: ["app.bsky.feed.post", "app.bsky.actor.profile"], - handleIsCorrect: true, - }); -}); - -// GET /xrpc/com.atproto.repo.getRecord -app.get("/xrpc/com.atproto.repo.getRecord", async (c) => { - const repo = c.req.query("repo"); - const collection = c.req.query("collection"); - const rkey = c.req.query("rkey"); - - if (repo !== env.DID) { - return c.json({ error: "RepoNotFound" }, 404); - } - - const account = getAccount(); - const record = await account.getRecord(collection, rkey); - - if (!record) { - return c.json({ error: "RecordNotFound" }, 404); - } - - return c.json({ - uri: `at://${env.DID}/${collection}/${rkey}`, - cid: record.cid.toString(), - value: record.value, - }); -}); - -// POST /xrpc/com.atproto.repo.createRecord -app.post("/xrpc/com.atproto.repo.createRecord", authMiddleware, async (c) => { - const body = await c.req.json(); - const { repo, collection, rkey, record } = body; - - if (repo !== env.DID) { - return c.json({ error: "InvalidRequest", message: "Wrong repo" }, 400); - } - - const account = getAccount(); - const result = await account.createRecord( - collection, - rkey || TID.nextStr(), - record, - ); - - return c.json(result); -}); - -// POST /xrpc/com.atproto.repo.deleteRecord -app.post("/xrpc/com.atproto.repo.deleteRecord", authMiddleware, async (c) => { - const body = await c.req.json(); - const { repo, collection, rkey } = body; - - if (repo !== env.DID) { - return c.json({ error: "InvalidRequest" }, 400); - } - - const account = getAccount(); - await account.deleteRecord(collection, rkey); - - return c.json({}); -}); -``` - -#### Tier 3: Server Identity - -```typescript -// GET /xrpc/com.atproto.server.describeServer -app.get("/xrpc/com.atproto.server.describeServer", (c) => { - return c.json({ - did: `did:web:${env.PDS_HOSTNAME}`, - availableUserDomains: [], - inviteCodeRequired: false, - phoneVerificationRequired: false, - links: {}, - }); -}); - -// GET /xrpc/com.atproto.identity.resolveHandle -app.get("/xrpc/com.atproto.identity.resolveHandle", (c) => { - const handle = c.req.query("handle"); - - if (handle !== env.HANDLE) { - return c.json({ error: "HandleNotFound" }, 404); - } - - return c.json({ did: env.DID }); -}); -``` - -#### Testing Strategy - -```typescript -// test/xrpc.test.ts -import { describe, it, expect } from "vitest"; -import { SELF } from "cloudflare:test"; - -describe("XRPC Endpoints", () => { - describe("com.atproto.sync.getRepo", () => { - it("returns CAR file for valid DID", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.sync.getRepo?did=did:web:pds.test", - ); - - expect(response.status).toBe(200); - expect(response.headers.get("Content-Type")).toBe( - "application/vnd.ipld.car", - ); - - const bytes = await response.arrayBuffer(); - expect(bytes.byteLength).toBeGreaterThan(0); - }); - - it("returns 404 for unknown DID", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.sync.getRepo?did=did:web:other.com", - ); - - expect(response.status).toBe(404); - const body = await response.json(); - expect(body.error).toBe("RepoNotFound"); - }); - }); - - describe("com.atproto.repo.createRecord", () => { - it("requires authentication", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.createRecord", - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - repo: "did:web:pds.test", - collection: "app.bsky.feed.post", - record: { text: "Hello", createdAt: new Date().toISOString() }, - }), - }, - ); - - expect(response.status).toBe(401); - }); - - it("creates record with valid auth", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.createRecord", - { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: "Bearer test-token", - }, - body: JSON.stringify({ - repo: "did:web:pds.test", - collection: "app.bsky.feed.post", - record: { text: "Hello", createdAt: new Date().toISOString() }, - }), - }, - ); - - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.uri).toMatch(/^at:\/\//); - expect(body.cid).toBeDefined(); - }); - }); - - describe("com.atproto.repo.getRecord", () => { - it("retrieves created record", async () => { - // First create a record - await SELF.fetch("https://pds.test/xrpc/com.atproto.repo.createRecord", { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: "Bearer test-token", - }, - body: JSON.stringify({ - repo: "did:web:pds.test", - collection: "app.bsky.feed.post", - rkey: "test123", - record: { text: "Hello", createdAt: new Date().toISOString() }, - }), - }); - - // Then retrieve it - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.getRecord?" + - "repo=did:web:pds.test&collection=app.bsky.feed.post&rkey=test123", - ); - - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.value.text).toBe("Hello"); - }); - }); -}); -``` - ---- - -### Phase 4: Firehose (subscribeRepos) - -**Goal:** Implement the WebSocket event stream that relays subscribe to. - -#### Frame Format - -Each WebSocket frame consists of two concatenated DAG-CBOR objects: - -```typescript -// Frame structure -interface FirehoseFrame { - header: { op: 1; t: string } | { op: -1 }; // op=1 message, op=-1 error - body: CommitEvent | IdentityEvent | ErrorBody; -} - -interface CommitEvent { - seq: number; // Sequence number - rebase: false; // Deprecated - tooBig: false; // Oversized indicator - repo: string; // DID - commit: CID; // Commit CID - rev: string; // Revision TID - since: string | null; // Previous revision - blocks: Uint8Array; // CAR file with diff blocks - ops: RepoOp[]; // Record operations - blobs: CID[]; // Referenced blobs - time: string; // ISO timestamp -} - -interface RepoOp { - action: "create" | "update" | "delete"; - path: string; // collection/rkey - cid: CID | null; // New CID (null for deletes) -} -``` - -#### Sequence Manager - -```sql --- Add to schema -CREATE TABLE firehose_events ( - seq INTEGER PRIMARY KEY AUTOINCREMENT, - event_type TEXT NOT NULL, - payload BLOB NOT NULL, - created_at TEXT NOT NULL DEFAULT (datetime('now')) -); - --- Keep last 10,000 events, prune older -CREATE TRIGGER prune_firehose_events -AFTER INSERT ON firehose_events -BEGIN - DELETE FROM firehose_events - WHERE seq < (SELECT MAX(seq) - 10000 FROM firehose_events); -END; -``` - -```typescript -// src/sequencer.ts -import * as cbor from "cborg"; -import { blocksToCarFile } from "@atproto/repo"; - -export class Sequencer { - constructor(private sql: SqlStorage) {} - - async sequenceCommit(commit: CommitData): Promise { - // Create CAR slice with commit diff - const carBytes = await blocksToCarFile(commit.cid, commit.newBlocks); - - // Build event payload - const event = { - repo: commit.did, - commit: commit.cid, - rev: commit.rev, - since: commit.since, - blocks: carBytes, - ops: commit.ops.map((op) => ({ - action: op.action, - path: `${op.collection}/${op.rkey}`, - cid: op.cid, - })), - rebase: false, - tooBig: carBytes.length > 1_000_000, - blobs: [], - time: new Date().toISOString(), - }; - - // Store in SQLite - const result = this.sql - .exec( - `INSERT INTO firehose_events (event_type, payload, created_at) - VALUES ('commit', ?, datetime('now')) - RETURNING seq`, - cbor.encode(event), - ) - .one(); - - return result.seq; - } - - async getEventsSince(cursor: number, limit = 100): Promise { - const rows = this.sql - .exec( - `SELECT seq, event_type, payload, created_at - FROM firehose_events - WHERE seq > ? - ORDER BY seq ASC - LIMIT ?`, - cursor, - limit, - ) - .toArray(); - - return rows.map((row) => ({ - seq: row.seq, - type: row.event_type, - event: cbor.decode(row.payload), - time: row.created_at, - })); - } - - getLatestSeq(): number { - const row = this.sql - .exec("SELECT MAX(seq) as seq FROM firehose_events") - .one(); - return row?.seq ?? 0; - } -} -``` - -#### WebSocket Hibernation Handler - -```typescript -// src/firehose.ts -import * as cbor from "cborg"; - -export class FirehoseHandler { - constructor( - private ctx: DurableObjectState, - private sequencer: Sequencer, - ) {} - - async handleUpgrade(request: Request): Promise { - const url = new URL(request.url); - const cursor = url.searchParams.get("cursor"); - - // Create WebSocket pair - const { 0: client, 1: server } = new WebSocketPair(); - - // Accept with hibernation - this.ctx.acceptWebSocket(server); - - // Store cursor in attachment - server.serializeAttachment({ - cursor: cursor ? parseInt(cursor) : null, - connectedAt: Date.now(), - }); - - // Backfill if cursor provided - if (cursor) { - await this.backfill(server, parseInt(cursor)); - } - - return new Response(null, { status: 101, webSocket: client }); - } - - private async backfill(ws: WebSocket, cursor: number) { - const latestSeq = this.sequencer.getLatestSeq(); - - // Check if cursor is in future - if (cursor > latestSeq) { - const frame = this.encodeError("FutureCursor", "Cursor in the future"); - ws.send(frame); - ws.close(1008, "FutureCursor"); - return; - } - - // Backfill from cursor - const events = await this.sequencer.getEventsSince(cursor, 1000); - - for (const event of events) { - const frame = this.encodeCommitFrame(event); - ws.send(frame); - } - - // Update cursor in attachment - if (events.length > 0) { - const attachment = ws.deserializeAttachment(); - attachment.cursor = events[events.length - 1].seq; - ws.serializeAttachment(attachment); - } - } - - // Called when DO has new commit - async broadcast(event: SeqEvent) { - const frame = this.encodeCommitFrame(event); - - for (const ws of this.ctx.getWebSockets()) { - try { - ws.send(frame); - - // Update cursor - const attachment = ws.deserializeAttachment(); - attachment.cursor = event.seq; - ws.serializeAttachment(attachment); - } catch (e) { - // Client disconnected, will be cleaned up - } - } - } - - private encodeCommitFrame(event: SeqEvent): Uint8Array { - const header = cbor.encode({ op: 1, t: "#commit" }); - const body = cbor.encode({ seq: event.seq, ...event.event }); - - const frame = new Uint8Array(header.length + body.length); - frame.set(header, 0); - frame.set(body, header.length); - - return frame; - } - - private encodeError(error: string, message: string): Uint8Array { - const header = cbor.encode({ op: -1 }); - const body = cbor.encode({ error, message }); - - const frame = new Uint8Array(header.length + body.length); - frame.set(header, 0); - frame.set(body, header.length); - - return frame; - } - - // Hibernation callbacks - webSocketMessage(ws: WebSocket, message: string | ArrayBuffer) { - // Firehose is server-push only, ignore client messages - } - - webSocketClose(ws: WebSocket, code: number, reason: string) { - // Cleanup handled automatically - } - - webSocketError(ws: WebSocket, error: Error) { - console.error("WebSocket error:", error); - } -} -``` - -#### Testing Strategy - -```typescript -// test/firehose.test.ts -import { describe, it, expect } from "vitest"; -import { env, runInDurableObject } from "cloudflare:test"; -import * as cbor from "cborg"; - -describe("Firehose", () => { - it("accepts WebSocket connections", async () => { - const id = env.ACCOUNT.idFromName("account"); - const stub = env.ACCOUNT.get(id); - - const response = await stub.fetch( - "https://pds.test/xrpc/com.atproto.sync.subscribeRepos", - { headers: { Upgrade: "websocket" } }, - ); - - expect(response.status).toBe(101); - expect(response.webSocket).toBeDefined(); - }); - - it("backfills events from cursor", async () => { - const id = env.ACCOUNT.idFromName("account"); - const stub = env.ACCOUNT.get(id); - - // Create some records first - for (let i = 0; i < 5; i++) { - await stub.createRecord("app.bsky.feed.post", `post${i}`, { - text: `Post ${i}`, - createdAt: new Date().toISOString(), - }); - } - - // Connect with cursor=0 to get all events - const response = await stub.fetch( - "https://pds.test/xrpc/com.atproto.sync.subscribeRepos?cursor=0", - { headers: { Upgrade: "websocket" } }, - ); - - const ws = response.webSocket!; - ws.accept(); - - const messages: Uint8Array[] = []; - ws.addEventListener("message", (event) => { - messages.push(new Uint8Array(event.data as ArrayBuffer)); - }); - - // Wait for backfill - await new Promise((resolve) => setTimeout(resolve, 100)); - - expect(messages.length).toBe(5); - - // Verify frame structure - for (const msg of messages) { - // First decode header - const [header, headerLen] = cbor.decodeFirst(msg); - expect(header.op).toBe(1); - expect(header.t).toBe("#commit"); - - // Then decode body - const body = cbor.decode(msg.slice(headerLen)); - expect(body.seq).toBeGreaterThan(0); - expect(body.repo).toBe("did:web:pds.test"); - } - - ws.close(); - }); - - it("broadcasts new commits to connected clients", async () => { - const id = env.ACCOUNT.idFromName("account"); - const stub = env.ACCOUNT.get(id); - - // Connect to firehose - const response = await stub.fetch( - "https://pds.test/xrpc/com.atproto.sync.subscribeRepos", - { headers: { Upgrade: "websocket" } }, - ); - - const ws = response.webSocket!; - ws.accept(); - - const messages: any[] = []; - ws.addEventListener("message", (event) => { - const msg = new Uint8Array(event.data as ArrayBuffer); - const [header, headerLen] = cbor.decodeFirst(msg); - const body = cbor.decode(msg.slice(headerLen)); - messages.push({ header, body }); - }); - - // Create a record (should broadcast) - await stub.createRecord("app.bsky.feed.post", "live-post", { - text: "Live post!", - createdAt: new Date().toISOString(), - }); - - // Wait for broadcast - await new Promise((resolve) => setTimeout(resolve, 50)); - - expect(messages.length).toBe(1); - expect(messages[0].body.ops[0].path).toBe("app.bsky.feed.post/live-post"); - - ws.close(); - }); - - it("rejects future cursor", async () => { - const id = env.ACCOUNT.idFromName("account"); - const stub = env.ACCOUNT.get(id); - - const response = await stub.fetch( - "https://pds.test/xrpc/com.atproto.sync.subscribeRepos?cursor=999999", - { headers: { Upgrade: "websocket" } }, - ); - - const ws = response.webSocket!; - ws.accept(); - - let errorReceived = false; - ws.addEventListener("message", (event) => { - const msg = new Uint8Array(event.data as ArrayBuffer); - const [header] = cbor.decodeFirst(msg); - if (header.op === -1) { - errorReceived = true; - } - }); - - await new Promise((resolve) => setTimeout(resolve, 50)); - expect(errorReceived).toBe(true); - }); -}); -``` - ---- - -### Phase 5: Blob Storage - -**Goal:** Support blob upload and retrieval for images/media. - -#### R2 Blob Storage - -```typescript -// src/blobs.ts -import { sha256 } from "multiformats/hashes/sha2"; -import { CID } from "multiformats/cid"; - -export class BlobStore { - constructor( - private r2: R2Bucket, - private did: string, - ) {} - - async putBlob(bytes: Uint8Array, mimeType: string): Promise { - // Compute CID - const hash = await sha256.digest(bytes); - const cid = CID.create(1, 0x55, hash); // raw codec - - // Store in R2 with DID prefix - const key = `${this.did}/${cid.toString()}`; - await this.r2.put(key, bytes, { - httpMetadata: { contentType: mimeType }, - }); - - return { - $type: "blob", - ref: { $link: cid.toString() }, - mimeType, - size: bytes.length, - }; - } - - async getBlob(cid: CID): Promise { - const key = `${this.did}/${cid.toString()}`; - return this.r2.get(key); - } - - async hasBlob(cid: CID): Promise { - const key = `${this.did}/${cid.toString()}`; - const head = await this.r2.head(key); - return head !== null; - } -} -``` - -#### XRPC Endpoints - -```typescript -// POST /xrpc/com.atproto.repo.uploadBlob -app.post("/xrpc/com.atproto.repo.uploadBlob", authMiddleware, async (c) => { - const contentType = - c.req.header("Content-Type") || "application/octet-stream"; - const bytes = new Uint8Array(await c.req.arrayBuffer()); - - // Size limit check - if (bytes.length > 5_000_000) { - // 5MB - return c.json({ error: "BlobTooLarge" }, 400); - } - - const account = getAccount(); - const blobRef = await account.uploadBlob(bytes, contentType); - - return c.json({ blob: blobRef }); -}); - -// GET /xrpc/com.atproto.sync.getBlob -app.get("/xrpc/com.atproto.sync.getBlob", async (c) => { - const did = c.req.query("did"); - const cidStr = c.req.query("cid"); - - if (did !== env.DID) { - return c.json({ error: "RepoNotFound" }, 404); - } - - const cid = CID.parse(cidStr); - const blob = await env.BLOBS.get(`${did}/${cid.toString()}`); - - if (!blob) { - return c.json({ error: "BlobNotFound" }, 404); - } - - return new Response(blob.body, { - headers: { - "Content-Type": - blob.httpMetadata?.contentType || "application/octet-stream", - "Content-Length": blob.size.toString(), - }, - }); -}); -``` - -#### Testing Strategy - -```typescript -// test/blobs.test.ts -import { describe, it, expect } from "vitest"; -import { SELF, env } from "cloudflare:test"; - -describe("Blob Storage", () => { - it("uploads and retrieves blobs", async () => { - // Upload - const imageBytes = new Uint8Array([0x89, 0x50, 0x4e, 0x47]); // PNG header - const uploadResponse = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.uploadBlob", - { - method: "POST", - headers: { - Authorization: "Bearer test-token", - "Content-Type": "image/png", - }, - body: imageBytes, - }, - ); - - expect(uploadResponse.status).toBe(200); - const { blob } = await uploadResponse.json(); - expect(blob.ref.$link).toBeDefined(); - expect(blob.mimeType).toBe("image/png"); - - // Retrieve - const getResponse = await SELF.fetch( - `https://pds.test/xrpc/com.atproto.sync.getBlob?did=did:web:pds.test&cid=${blob.ref.$link}`, - ); - - expect(getResponse.status).toBe(200); - expect(getResponse.headers.get("Content-Type")).toBe("image/png"); - }); - - it("rejects oversized blobs", async () => { - const largeBlob = new Uint8Array(6_000_000); // 6MB - - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.uploadBlob", - { - method: "POST", - headers: { - Authorization: "Bearer test-token", - "Content-Type": "application/octet-stream", - }, - body: largeBlob, - }, - ); - - expect(response.status).toBe(400); - const body = await response.json(); - expect(body.error).toBe("BlobTooLarge"); - }); -}); -``` - ---- - -### Phase 6: Identity & DID Document - -**Goal:** Serve the DID document so the network can discover this PDS. - -#### DID Document - -```typescript -// src/identity.ts -export function generateDidDocument(env: Env) { - return { - "@context": [ - "https://www.w3.org/ns/did/v1", - "https://w3id.org/security/multikey/v1", - "https://w3id.org/security/suites/secp256k1-2019/v1", - ], - id: env.DID, - alsoKnownAs: [`at://${env.HANDLE}`], - verificationMethod: [ - { - id: `${env.DID}#atproto`, - type: "Multikey", - controller: env.DID, - publicKeyMultibase: env.SIGNING_KEY_PUBLIC, - }, - ], - service: [ - { - id: "#atproto_pds", - type: "AtprotoPersonalDataServer", - serviceEndpoint: `https://${env.PDS_HOSTNAME}`, - }, - ], - }; -} -``` - -#### Well-Known Endpoints - -```typescript -// Serve directly from Worker (no DO needed) - -// GET /.well-known/did.json (for did:web) -app.get("/.well-known/did.json", (c) => { - return c.json(generateDidDocument(c.env)); -}); - -// GET /.well-known/atproto-did (handle verification) -app.get("/.well-known/atproto-did", (c) => { - return c.text(c.env.DID); -}); -``` - -#### Testing Strategy - -```typescript -// test/identity.test.ts -import { describe, it, expect } from "vitest"; -import { SELF } from "cloudflare:test"; - -describe("Identity", () => { - describe("DID Document", () => { - it("serves did:web document", async () => { - const response = await SELF.fetch( - "https://pds.test/.well-known/did.json", - ); - - expect(response.status).toBe(200); - expect(response.headers.get("Content-Type")).toContain( - "application/json", - ); - - const doc = await response.json(); - expect(doc.id).toBe("did:web:pds.test"); - expect(doc.service[0].type).toBe("AtprotoPersonalDataServer"); - }); - }); - - describe("Handle Verification", () => { - it("serves atproto-did for handle verification", async () => { - const response = await SELF.fetch( - "https://pds.test/.well-known/atproto-did", - ); - - expect(response.status).toBe(200); - const did = await response.text(); - expect(did).toBe("did:web:pds.test"); - }); - }); - - describe("resolveHandle", () => { - it("resolves configured handle", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.identity.resolveHandle?handle=alice.test", - ); - - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.did).toBe("did:web:pds.test"); - }); - - it("returns 404 for unknown handle", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.identity.resolveHandle?handle=unknown.test", - ); - - expect(response.status).toBe(404); - }); - }); -}); -``` - ---- - -### Phase 7: Authentication - -**Goal:** Secure write endpoints. - -#### Bearer Token Auth (MVP) - -```typescript -// src/auth.ts -export function authMiddleware(c: Context, next: Next) { - const authHeader = c.req.header("Authorization"); - - if (!authHeader?.startsWith("Bearer ")) { - return c.json({ error: "AuthRequired" }, 401); - } - - const token = authHeader.slice(7); - - if (token !== c.env.AUTH_TOKEN) { - return c.json({ error: "InvalidToken" }, 401); - } - - return next(); -} -``` - -#### Future: JWT Verification - -```typescript -// For future OAuth/JWT support -import { verifyJwt } from "@atproto/crypto"; - -export async function jwtAuthMiddleware(c: Context, next: Next) { - const authHeader = c.req.header("Authorization"); - - if (!authHeader?.startsWith("Bearer ")) { - return c.json({ error: "AuthRequired" }, 401); - } - - const token = authHeader.slice(7); - - try { - const payload = await verifyJwt(token, { - audience: `https://${c.env.PDS_HOSTNAME}`, - issuer: c.env.DID, - }); - - c.set("auth", { did: payload.iss, scope: payload.scope }); - return next(); - } catch (e) { - return c.json({ error: "InvalidToken", message: e.message }, 401); - } -} -``` - -#### Testing Strategy - -```typescript -// test/auth.test.ts -import { describe, it, expect } from "vitest"; -import { SELF } from "cloudflare:test"; - -describe("Authentication", () => { - const writeEndpoints = [ - { method: "POST", path: "/xrpc/com.atproto.repo.createRecord" }, - { method: "POST", path: "/xrpc/com.atproto.repo.putRecord" }, - { method: "POST", path: "/xrpc/com.atproto.repo.deleteRecord" }, - { method: "POST", path: "/xrpc/com.atproto.repo.uploadBlob" }, - ]; - - for (const { method, path } of writeEndpoints) { - it(`requires auth for ${path}`, async () => { - const response = await SELF.fetch(`https://pds.test${path}`, { - method, - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({}), - }); - - expect(response.status).toBe(401); - }); - } - - const readEndpoints = [ - "/xrpc/com.atproto.repo.getRecord?repo=did:web:pds.test&collection=app.bsky.feed.post&rkey=test", - "/xrpc/com.atproto.repo.describeRepo?repo=did:web:pds.test", - "/xrpc/com.atproto.sync.getRepo?did=did:web:pds.test", - ]; - - for (const path of readEndpoints) { - it(`allows unauthenticated access to ${path.split("?")[0]}`, async () => { - const response = await SELF.fetch(`https://pds.test${path}`); - - // Should not be 401 (might be 404 if no data) - expect(response.status).not.toBe(401); - }); - } -}); -``` - ---- - -### Phase 8: Session Authentication - -**Goal:** Enable login from Bluesky app and other AT Protocol clients via JWT sessions. - -#### Overview - -For single-user PDS, we simplify session auth: - -- Password stored as bcrypt hash in environment variable (`PASSWORD_HASH`) -- JWTs signed with existing signing key (secp256k1) -- Access tokens short-lived (2 hours), refresh tokens longer (90 days) -- No email, no MFA, no complex account states - -#### Required Endpoints - -```typescript -// com.atproto.server.createSession -// POST - authenticate with identifier + password -Input: { identifier: string, password: string } -Output: { accessJwt, refreshJwt, handle, did, didDoc?, active: true } - -// com.atproto.server.refreshSession -// POST - refresh tokens using refresh JWT (in Authorization header) -Output: { accessJwt, refreshJwt, handle, did, didDoc?, active: true } - -// com.atproto.server.getSession -// GET - get current session info (requires access JWT) -Output: { handle, did, email?, didDoc?, active: true } - -// com.atproto.server.deleteSession -// POST - logout (requires refresh JWT) -Output: {} -``` - -#### JWT Token Format - -Per AT Protocol spec, use RFC 9068 token types: - -```typescript -// Access Token (short-lived: 2 hours) -{ - typ: "at+jwt", - alg: "ES256K", // secp256k1 -} -{ - iss: "did:web:pds.example.com", // PDS DID - aud: "did:web:pds.example.com", // Same for self-issued - sub: "did:web:user.example.com", // User DID - iat: 1234567890, - exp: 1234575090, // +2 hours - scope: "atproto", -} - -// Refresh Token (long-lived: 90 days) -{ - typ: "refresh+jwt", - alg: "ES256K", -} -{ - iss: "did:web:pds.example.com", - aud: "did:web:pds.example.com", - sub: "did:web:user.example.com", - iat: 1234567890, - exp: 1242343890, // +90 days - jti: "unique-token-id", // For revocation - scope: "com.atproto.refresh", -} -``` - -#### Implementation - -```typescript -// src/session.ts -import { Secp256k1Keypair } from "@atproto/crypto"; - -const ACCESS_TOKEN_LIFETIME = 2 * 60 * 60; // 2 hours -const REFRESH_TOKEN_LIFETIME = 90 * 24 * 60 * 60; // 90 days - -export async function createAccessToken( - keypair: Secp256k1Keypair, - did: string, - pdsDid: string, -): Promise { - const now = Math.floor(Date.now() / 1000); - const payload = { - iss: pdsDid, - aud: pdsDid, - sub: did, - iat: now, - exp: now + ACCESS_TOKEN_LIFETIME, - scope: "atproto", - }; - return signJwt(keypair, payload, "at+jwt"); -} - -export async function createRefreshToken( - keypair: Secp256k1Keypair, - did: string, - pdsDid: string, -): Promise { - const now = Math.floor(Date.now() / 1000); - const jti = crypto.randomUUID(); - const payload = { - iss: pdsDid, - aud: pdsDid, - sub: did, - iat: now, - exp: now + REFRESH_TOKEN_LIFETIME, - jti, - scope: "com.atproto.refresh", - }; - return signJwt(keypair, payload, "refresh+jwt"); -} - -async function signJwt( - keypair: Secp256k1Keypair, - payload: Record, - typ: string, -): Promise { - const header = { alg: "ES256K", typ }; - const headerB64 = base64url(JSON.stringify(header)); - const payloadB64 = base64url(JSON.stringify(payload)); - const signingInput = `${headerB64}.${payloadB64}`; - const signature = await keypair.sign(new TextEncoder().encode(signingInput)); - return `${signingInput}.${base64url(signature)}`; -} -``` - -#### Password Verification - -```typescript -// Use bcrypt for password hashing (via Web Crypto compatible library) -import { compare } from "bcryptjs"; // Works in Workers - -export async function verifyPassword( - password: string, - hash: string, -): Promise { - return compare(password, hash); -} -``` - -#### Auth Middleware Update - -```typescript -// src/middleware/auth.ts -export async function requireAuth(c: Context, next: Next) { - const authHeader = c.req.header("Authorization"); - - if (!authHeader?.startsWith("Bearer ")) { - return c.json({ error: "AuthRequired" }, 401); - } - - const token = authHeader.slice(7); - - // Try static token first (backwards compat) - if (token === c.env.AUTH_TOKEN) { - return next(); - } - - // Try JWT verification - try { - const payload = await verifyAccessToken(token, c.env); - c.set("auth", { did: payload.sub, scope: payload.scope }); - return next(); - } catch { - return c.json({ error: "InvalidToken" }, 401); - } -} -``` - -#### Configuration - -New environment variable: - -- `PASSWORD_HASH` - bcrypt hash of user password (generate with `npx bcryptjs hash "password"`) - -#### Testing Strategy - -```typescript -// test/session.test.ts -describe("Session Authentication", () => { - it("creates session with valid credentials", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.server.createSession", - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - identifier: "alice.test", - password: "test-password", - }), - }, - ); - - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.accessJwt).toBeDefined(); - expect(body.refreshJwt).toBeDefined(); - expect(body.did).toBe("did:web:pds.test"); - expect(body.handle).toBe("alice.test"); - }); - - it("rejects invalid password", async () => { - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.server.createSession", - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - identifier: "alice.test", - password: "wrong-password", - }), - }, - ); - - expect(response.status).toBe(401); - }); - - it("uses access token for authenticated requests", async () => { - // Login - const loginRes = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.server.createSession", - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - identifier: "alice.test", - password: "test-password", - }), - }, - ); - const { accessJwt } = await loginRes.json(); - - // Use token - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.createRecord", - { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: `Bearer ${accessJwt}`, - }, - body: JSON.stringify({ - repo: "did:web:pds.test", - collection: "app.bsky.feed.post", - record: { text: "Hello!", createdAt: new Date().toISOString() }, - }), - }, - ); - - expect(response.status).toBe(200); - }); - - it("refreshes session with refresh token", async () => { - // Login - const loginRes = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.server.createSession", - { - method: "POST", - headers: { "Content-Type": "application/json" }, - body: JSON.stringify({ - identifier: "alice.test", - password: "test-password", - }), - }, - ); - const { refreshJwt } = await loginRes.json(); - - // Refresh - const response = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.server.refreshSession", - { - method: "POST", - headers: { Authorization: `Bearer ${refreshJwt}` }, - }, - ); - - expect(response.status).toBe(200); - const body = await response.json(); - expect(body.accessJwt).toBeDefined(); - expect(body.refreshJwt).toBeDefined(); - }); -}); -``` - ---- - -## Testing Configuration - -### Vitest Setup - -```typescript -// vitest.config.ts -import { defineWorkersConfig } from "@cloudflare/vitest-pool-workers/config"; - -export default defineWorkersConfig({ - test: { - globals: true, - poolOptions: { - workers: { - wrangler: { configPath: "./wrangler.toml" }, - miniflare: { - bindings: { - DID: "did:web:pds.test", - HANDLE: "alice.test", - PDS_HOSTNAME: "pds.test", - AUTH_TOKEN: "test-token", - SIGNING_KEY: "test-signing-key", - }, - }, - }, - }, - }, -}); -``` - -### Test Environment Types - -```typescript -// test/env.d.ts -declare module "cloudflare:test" { - interface ProvidedEnv { - ACCOUNT: DurableObjectNamespace; - BLOBS: R2Bucket; - DID: string; - HANDLE: string; - PDS_HOSTNAME: string; - AUTH_TOKEN: string; - SIGNING_KEY: string; - } -} -``` - -### Integration Test Suite - -```typescript -// test/integration/federation.test.ts -import { describe, it, expect } from "vitest"; -import { SELF } from "cloudflare:test"; - -describe("Federation Integration", () => { - it("complete flow: create post, sync repo, verify on firehose", async () => { - // 1. Create a post - const createResponse = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.repo.createRecord", - { - method: "POST", - headers: { - "Content-Type": "application/json", - Authorization: "Bearer test-token", - }, - body: JSON.stringify({ - repo: "did:web:pds.test", - collection: "app.bsky.feed.post", - record: { - $type: "app.bsky.feed.post", - text: "Hello from edge PDS!", - createdAt: new Date().toISOString(), - }, - }), - }, - ); - - expect(createResponse.status).toBe(200); - const { uri, cid } = await createResponse.json(); - - // 2. Export repo as CAR - const repoResponse = await SELF.fetch( - "https://pds.test/xrpc/com.atproto.sync.getRepo?did=did:web:pds.test", - ); - - expect(repoResponse.status).toBe(200); - const carBytes = await repoResponse.arrayBuffer(); - expect(carBytes.byteLength).toBeGreaterThan(0); - - // 3. Verify record exists - const getResponse = await SELF.fetch( - `https://pds.test/xrpc/com.atproto.repo.getRecord?repo=did:web:pds.test&collection=app.bsky.feed.post&rkey=${uri.split("/").pop()}`, - ); - - expect(getResponse.status).toBe(200); - const record = await getResponse.json(); - expect(record.value.text).toBe("Hello from edge PDS!"); - }); -}); -``` - ---- - -## Configuration - -| Config | Type | Purpose | -| -------------------- | -------- | -------------------------------------------------- | -| `DID` | Secret | The account's DID (did:web:... or did:plc:...) | -| `SIGNING_KEY` | Secret | Private key for signing commits (hex or multibase) | -| `SIGNING_KEY_PUBLIC` | Secret | Public key for DID document | -| `HANDLE` | Variable | The account's handle | -| `AUTH_TOKEN` | Secret | Bearer token for write auth (API access) | -| `JWT_SECRET` | Secret | HS256 secret for session tokens (min 32 chars) | -| `PASSWORD_HASH` | Secret | Bcrypt hash for app login (required) | -| `PDS_HOSTNAME` | Variable | Public hostname of the PDS | - -**Recommended: Use the CLI for setup:** - -```bash -pds init # Interactive setup wizard (production) -pds init --local # Write to .dev.vars for local development -``` - -Or set secrets manually: - -```bash -wrangler secret put DID -wrangler secret put SIGNING_KEY -wrangler secret put SIGNING_KEY_PUBLIC -wrangler secret put AUTH_TOKEN -wrangler secret put JWT_SECRET # Use a long random string (32+ chars) -wrangler secret put PASSWORD_HASH # Generate: npx bcryptjs hash "your-password" -``` - ---- - -## Suggested Order of Work - -1. **Storage adapter** – get `@atproto/repo` working with DO SQLite -2. **DO skeleton** – basic structure, initialization, repo instance -3. **describeRepo / getRecord** – prove reads work -4. **createRecord** – prove writes work -5. **getRepo (CAR export)** – sync endpoint -6. **subscribeRepos** – firehose (this is the complex one) -7. **Blob upload/get** – R2 integration -8. **DID document** – identity endpoints -9. **Auth** – lock down write endpoints -10. **Polish** – error handling, logging, tests - ---- - -## Out of Scope (for MVP) - -- Account creation / multi-user -- OAuth / third-party app auth -- Labelling -- Email verification -- Rate limiting -- Advanced admin endpoints -- Full did:plc migration (partial support exists - see "Future Work" section) - -These can all be added later. - ---- - -## Deployment Architecture - -### Design Decision: Zero-Code Re-Export Pattern - -For maximum simplicity, users deploying a PDS should not need to write any code. The `@ascorbic/pds` package provides everything needed, and users simply re-export it. - -#### User's Worker (Minimal) - -```typescript -// src/index.ts -export { default, AccountDurableObject } from "@ascorbic/pds"; -``` - -That's it. No additional code required. - -#### Package Exports - -The `@ascorbic/pds` package exports: - -```typescript -// Core exports for advanced users -export { SqliteRepoStorage } from "./storage"; -export { AccountDurableObject } from "./account-do"; -export { BlobStore, type BlobRef } from "./blobs"; -export { Sequencer } from "./sequencer"; - -// Default export: configured Hono app -export default app; -``` - -#### Configuration - -All configuration is via environment variables and secrets: - -**Required environment variables:** - -- `PDS_HOSTNAME` - Public hostname (set in wrangler.jsonc) - -**Required secrets:** - -- `DID` - Account's DID (e.g., "did:web:pds.example.com") -- `HANDLE` - Account's handle (e.g., "alice.pds.example.com") -- `AUTH_TOKEN` - Bearer token for write operations -- `SIGNING_KEY` - Private key for signing commits (secp256k1 JWK) -- `SIGNING_KEY_PUBLIC` - Public key for DID document (multibase) - -**Resource bindings:** - -- `ACCOUNT` - DurableObjectNamespace binding -- `BLOBS` - R2Bucket binding - -#### Deployment Workflow - -1. **Scaffold** (future: via `npm create @ascorbic/pds`) - - Creates project directory with re-export pattern - - Generates wrangler.jsonc with bindings - -2. **Setup** (via `pds init` CLI) - - Interactive prompts for hostname, handle, DID, and password - - Generates secp256k1 keypair and JWT secret - - Sets vars in wrangler.jsonc (`PDS_HOSTNAME`, `DID`, `HANDLE`, `SIGNING_KEY_PUBLIC`) - - Sets secrets via wrangler (`AUTH_TOKEN`, `SIGNING_KEY`, `JWT_SECRET`, `PASSWORD_HASH`) - - Use `pds init --local` for local development (writes to `.dev.vars`) - -3. **Local Development** - - ```bash - pds init --local # Configure for local dev - wrangler dev - ``` - -4. **Production Deployment** - - ```bash - # Create R2 bucket - wrangler r2 bucket create pds-blobs - - # Run interactive setup (sets all vars and secrets) - pds init - - # Deploy - wrangler deploy - ``` - -#### Demo Structure - -``` -demos/pds/ -├── src/ -│ └── index.ts # Re-exports @ascorbic/pds -├── wrangler.jsonc # Worker config with bindings -├── package.json # Dependencies -├── .env.example # Template for required vars -└── README.md # Setup instructions -``` - -#### Future: create-pds CLI - -```bash -npm create @ascorbic/pds my-pds -cd my-pds -npm install -npm run dev -``` - -This will scaffold a complete deployment with: - -- Project structure -- Generated keys and configuration -- Pre-configured wrangler.jsonc -- Setup instructions - -#### Rationale - -1. **Single-user PDS**: Not a multi-tenant platform - each deployment serves one account -2. **Configuration via environment**: All customization is environment-based -3. **No code needed**: Users shouldn't need to understand Hono/Workers/DOs to deploy -4. **Future-proof**: Can add factory function later for customization without breaking changes - ---- - -## Reference Material - -- AT Protocol specs: https://atproto.com/specs -- `@atproto/repo` source: https://github.com/bluesky-social/atproto/tree/main/packages/repo -- `@atproto/pds` source (reference implementation): https://github.com/bluesky-social/atproto/tree/main/packages/pds -- XRPC spec: https://atproto.com/specs/xrpc -- Sync spec (firehose): https://atproto.com/specs/sync -- Repo spec: https://atproto.com/specs/repository -- Cloudflare Workers testing: https://developers.cloudflare.com/workers/testing/vitest-integration/ diff --git a/plans/complete/core-pds.md b/plans/complete/core-pds.md new file mode 100644 index 00000000..25456777 --- /dev/null +++ b/plans/complete/core-pds.md @@ -0,0 +1,201 @@ +# Core PDS Implementation + +**Status:** ✅ Complete + +## Overview + +A single-user AT Protocol Personal Data Server (PDS) implemented on Cloudflare Workers with Durable Objects. The PDS federates with the Bluesky network – the relay can sync from it, and AppViews can read from it. + +**Live at:** https://pds.mk.gg + +## Implemented Features + +### Storage Layer (Phase 1) +- ✅ `SqliteRepoStorage` implementing `@atproto/repo` RepoStorage interface +- ✅ SQLite schema for blocks, repo state, and firehose events +- ✅ Atomic commit operations with transaction support + +### Durable Object Architecture (Phase 2) +- ✅ `AccountDurableObject` with Repo integration +- ✅ Lazy initialization with `blockConcurrencyWhile` +- ✅ RPC-first architecture following DO best practices +- ✅ Signing key import from environment with validation + +### XRPC Endpoints (Phase 3) +- ✅ Sync endpoints: `getRepo`, `getRepoStatus`, `subscribeRepos` +- ✅ Repository operations: `describeRepo`, `getRecord`, `listRecords`, `createRecord`, `deleteRecord`, `putRecord`, `applyWrites` +- ✅ Server identity: `describeServer`, `resolveHandle` +- ✅ Blob operations: `uploadBlob`, `getBlob`, `listBlobs` +- ✅ Session management: `createSession`, `refreshSession`, `getSession`, `deleteSession` +- ✅ Migration: `importRepo`, `getAccountStatus` +- ✅ Service auth: `getServiceAuth` +- ✅ Preferences: `getPreferences`, `putPreferences` + +### Firehose Implementation (Phase 4) +- ✅ WebSocket hibernation API handlers +- ✅ DAG-CBOR frame encoding using `@atproto/lex-cbor` +- ✅ Event broadcasting to connected clients +- ✅ Cursor-based backfill and validation +- ✅ Sequencer class for commit event log management +- ✅ SQLite `firehose_events` table with automatic pruning + +### Blob Storage (Phase 5) +- ✅ R2 integration with `BlobStore` class +- ✅ CID generation using `cidForRawBytes()` from `@atproto/lex-cbor` +- ✅ 5MB upload limit enforcement +- ✅ Direct R2 access in endpoints + +### Identity & DID Documents (Phase 6) +- ✅ DID document served at `/.well-known/did.json` +- ✅ Handle verification at `/.well-known/atproto-did` +- ✅ Support for both did:web and did:plc identifiers + +### Authentication (Phase 7) +- ✅ Bearer token middleware for write endpoints +- ✅ Static token auth (AUTH_TOKEN) +- ✅ JWT-based session authentication +- ✅ bcrypt password hashing +- ✅ Access token + refresh token flow + +### Session Authentication (Phase 8) +- ✅ JWT signing with HS256 (using jose library) +- ✅ 60-minute access tokens, 90-day refresh tokens +- ✅ Compatible with Bluesky app authentication +- ✅ Password verification with bcryptjs + +### Lexicon Validation (Phase 8) +- ✅ `RecordValidator` class using `@atproto/lexicon` +- ✅ Optimistic validation strategy (fail-open) +- ✅ Dynamic schema loading via Vite glob imports +- ✅ Validation integrated into mutation endpoints + +### Account Migration (Phase 9) +- ✅ CAR file import using `readCarWithRoot()` +- ✅ Export/import workflow with validation +- ✅ DID matching verification +- ✅ Prevention of overwrites + +### Protocol Helpers +- ✅ All operations use official @atproto utilities +- ✅ TID generation via `TID.nextStr()` +- ✅ AT-URI construction via `AtUri.make()` +- ✅ DID/Handle validation +- ✅ CBOR encoding via `@atproto/lex-cbor` +- ✅ CAR export via `blocksToCarFile()` + +### CLI Setup Wizard +- ✅ `pds init` - Interactive setup for production +- ✅ `pds init --local` - Setup for local development +- ✅ Secret management commands +- ✅ Integration with wrangler config + +### Testing +- ✅ 140+ tests covering all features +- ✅ Vitest 4 with Cloudflare Workers pool +- ✅ Durable Object testing support +- ✅ Integration tests for federation + +### DID Resolution & XRPC Proxy +- ✅ Full DID resolver for did:web and did:plc +- ✅ DID caching with stale-while-revalidate +- ✅ XRPC proxy with atproto-proxy header support +- ✅ Service discovery and routing + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Cloudflare Edge │ +│ │ +│ ┌──────────────┐ ┌─────────────────────────────────┐ │ +│ │ Worker │────────▶│ Account Durable Object │ │ +│ │ (stateless) │ │ (single instance) │ │ +│ │ │ │ │ │ +│ │ • Routing │ │ • Repository (via @atproto/repo)│ │ +│ │ • Auth │ │ • SQLite storage │ │ +│ │ • DID doc │ │ • Firehose (WebSocket) │ │ +│ └──────────────┘ └─────────────────────────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ ┌──────────────┐ ┌─────────────────────────────────┐ │ +│ │ R2 │ │ DO SQLite │ │ +│ │ (blobs) │ │ (blocks, records, commits) │ │ +│ └──────────────┘ └─────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` + +## Configuration + +### Environment Variables +- `PDS_HOSTNAME` - Public hostname of the PDS +- `HANDLE` - Account handle +- `DID` - Account DID +- `SIGNING_KEY_PUBLIC` - Public key for DID document + +### Secrets +- `SIGNING_KEY` - Private signing key (secp256k1) +- `AUTH_TOKEN` - Bearer token for API access +- `JWT_SECRET` - Secret for JWT signing +- `PASSWORD_HASH` - bcrypt password hash + +### Bindings +- `ACCOUNT` - DurableObjectNamespace +- `BLOBS` - R2Bucket + +## Dependencies + +All dependencies are Workers-compatible: +- `@atproto/repo` - Core MST and repository operations +- `@atproto/crypto` - Cryptographic operations +- `@atproto/syntax` - Protocol utilities +- `@atproto/lexicon` - Schema validation +- `@atproto/lex-cbor` - CBOR encoding +- `@atproto/common-web` - Common utilities +- `hono` - HTTP router +- `jose` - JWT operations +- `bcryptjs` - Password hashing + +## Test Coverage + +- 16 storage tests +- 32 XRPC tests +- 8 firehose tests +- 10 blob tests +- 15 session tests +- 8 validation tests +- 9 migration tests +- 11 Bluesky validation tests +- 3 service-auth tests +- 28 CLI tests + +**Total: 140 tests, all passing** + +## Key Technical Decisions + +1. **Workers-native**: Built specifically for Cloudflare Workers, not a port +2. **Single-user**: Simplified auth and account management +3. **Official libraries**: Uses @atproto packages for all protocol operations +4. **RPC-first DO**: Durable Object exposes RPC methods, not fetch handlers +5. **WebSocket hibernation**: Leverages Workers-specific WebSocket API +6. **Zero-code deployment**: Users re-export package, no custom code needed + +## Performance + +- CAR export: ~100ms for typical repo +- Record operations: <10ms +- Firehose latency: <50ms +- Blob uploads: Streaming to R2, no memory buffering + +## Limitations + +- Single user only (by design) +- No email verification +- No OAuth provider (planned) +- No did:plc migration (partial support - see todo plans) +- No rate limiting (not needed for single-user) + +## References + +- [AT Protocol Specs](https://atproto.com/specs) +- [Bluesky PDS Reference](https://github.com/bluesky-social/atproto/tree/main/packages/pds) +- [Cloudflare Workers Docs](https://developers.cloudflare.com/workers/) diff --git a/plans/todo/endpoint-implementation.md b/plans/todo/endpoint-implementation.md new file mode 100644 index 00000000..9f84ffc0 --- /dev/null +++ b/plans/todo/endpoint-implementation.md @@ -0,0 +1,189 @@ +# AT Protocol PDS - Endpoint Implementation Status + +**Status:** 📋 Planning Document + +This document tracks the implementation status of all AT Protocol XRPC endpoints and prioritizes future work. + +## Implementation Summary + +**Total Core PDS Endpoints: 70** +- ✅ **Implemented: 26** (37%) +- ⚠️ **Partial/Stub: 3** (4%) +- ❌ **Not Implemented: 41** (59%) + +**For Single-User PDS:** +- **Necessary endpoints implemented: 26/~30** (87%) +- Most missing endpoints are multi-user, admin, or moderation features + +## Currently Supported Endpoints + +### com.atproto.repo (9/11 - 82%) + +| Endpoint | Status | Notes | +|----------|--------|-------| +| `applyWrites` | ✅ Complete | Batch operations, validates all records | +| `createRecord` | ✅ Complete | Validates against lexicon schemas | +| `deleteRecord` | ✅ Complete | Updates firehose | +| `describeRepo` | ✅ Complete | Returns collections and DID document | +| `getRecord` | ✅ Complete | With CID and value | +| `importRepo` | ✅ Complete | CAR file import with validation | +| `listRecords` | ✅ Complete | Pagination, cursor, reverse | +| `putRecord` | ✅ Complete | Create or update with validation | +| `uploadBlob` | ✅ Complete | 5MB limit, R2 storage | + +### com.atproto.sync (6/11 - 55%) + +| Endpoint | Status | Notes | +|----------|--------|-------| +| `getBlob` | ✅ Complete | Direct R2 access | +| `getRepo` | ✅ Complete | CAR file export | +| `getRepoStatus` | ✅ Complete | Active status, rev, head | +| `listBlobs` | ✅ Complete | Paginated blob listing | +| `listRepos` | ✅ Complete | Returns single repo (single-user) | +| `subscribeRepos` | ✅ Complete | WebSocket firehose with CBOR frames | + +### com.atproto.server (7/26 - 27%) + +| Endpoint | Status | Notes | +|----------|--------|-------| +| `createSession` | ✅ Complete | JWT + static token auth | +| `deleteSession` | ✅ Complete | Stateless (client-side) | +| `describeServer` | ✅ Complete | Server capabilities | +| `getAccountStatus` | ✅ Complete | Migration support | +| `getServiceAuth` | ✅ Complete | Service JWTs for AppView/external services | +| `getSession` | ✅ Complete | Current session info | +| `refreshSession` | ✅ Complete | Token refresh with validation | + +### com.atproto.identity (1/6 - 17%) + +| Endpoint | Status | Notes | +|----------|--------|-------| +| `resolveHandle` | ⚠️ Partial | Complete implementation (DNS + HTTPS for any handle) | + +### app.bsky.* (3 endpoints) + +| Endpoint | Status | Notes | +|----------|--------|-------| +| `actor.getPreferences` | ✅ Complete | Persists to SQLite | +| `actor.putPreferences` | ✅ Complete | Persists to SQLite | +| `ageassurance.getState` | ✅ Stub | Returns "assured" (self-hosted = pre-verified) | + +## TODO Endpoints (Grouped by Priority) + +### Migration Support (P1 - Critical) + +**Account Lifecycle:** +- `com.atproto.server.createAccount` - Create deactivated account for migration +- `com.atproto.server.activateAccount` - Activate account after migration +- `com.atproto.server.deactivateAccount` - Deactivate old account post-migration +- `com.atproto.server.checkAccountStatus` - Verify migration progress + +**Identity Management (PLC Operations):** +- `com.atproto.identity.getRecommendedDidCredentials` - Get DID credentials from new PDS +- `com.atproto.identity.requestPlcOperationSignature` - Request email challenge +- `com.atproto.identity.signPlcOperation` - Sign PLC operation with email token +- `com.atproto.identity.submitPlcOperation` - Submit to PLC directory + +**Data Migration:** +- `com.atproto.repo.listMissingBlobs` - Identify failed blob imports + +**Total: 9 endpoints** + +### App Passwords (P2 - Important) + +- `com.atproto.server.createAppPassword` - Create app-specific revocable passwords +- `com.atproto.server.listAppPasswords` - List all app passwords +- `com.atproto.server.revokeAppPassword` - Revoke specific app password + +**Total: 3 endpoints** + +### Advanced Sync (P3 - Nice to Have) + +- `com.atproto.sync.getBlocks` - Get specific blocks by CID +- `com.atproto.sync.getLatestCommit` - Get latest commit without full repo +- `com.atproto.sync.getRecord` - Get record with merkle proof + +**Total: 3 endpoints** + +## Will NOT Support + +### Multi-User Administration (14 endpoints) +**Reason:** Single-user PDS has no admin/user separation + +All `com.atproto.admin.*` endpoints + +### Moderation (1 endpoint) +**Reason:** Single-user PDS doesn't need moderation infrastructure + +- `com.atproto.moderation.createReport` + +### Account Creation & Invites (5 endpoints) +**Reason:** Single-user PDS is pre-configured + +- `com.atproto.server.createInviteCode` +- `com.atproto.server.createInviteCodes` +- `com.atproto.server.getAccountInviteCodes` +- `com.atproto.temp.checkSignupQueue` + +*Exception:* `createAccount` will be implemented for migration only + +### Email Verification & Recovery (6 endpoints) +**Reason:** Single-user PDS has no email system + +- `com.atproto.server.confirmEmail` +- `com.atproto.server.requestEmailConfirmation` +- `com.atproto.server.requestEmailUpdate` +- `com.atproto.server.updateEmail` +- `com.atproto.server.requestPasswordReset` +- `com.atproto.server.resetPassword` + +### Deprecated (2 endpoints) + +- `com.atproto.sync.deprecated.getCheckout` +- `com.atproto.sync.deprecated.getHead` + +**Will Not Support Total: 28 endpoints** + +## Proxy Strategy + +All unimplemented `app.bsky.*` endpoints are proxied to `api.bsky.app` with service auth. This includes: +- Feeds (`app.bsky.feed.*`) +- Graphs (`app.bsky.graph.*`) +- Notifications (`app.bsky.notification.*`) +- Labels (`app.bsky.labeler.*`) +- Chat (`chat.bsky.*`) + +This is intentional - the edge PDS focuses on repository operations and federates view/aggregation to AppView. + +## Implementation Phases + +### Phase 1: Migration Support (13 endpoints) +Enable full account migration to/from this PDS +- See `migration-wizard.md` for detailed specification + +### Phase 2: OAuth Provider +Enable ecosystem compatibility with "Login with Bluesky" apps +- See `oauth-provider.md` for detailed specification + +### Phase 3: Enhanced Features (3 endpoints) +Multi-device auth with app passwords + +### Phase 4: Advanced Sync (3 endpoints) +Efficient partial sync and merkle proofs + +## Endpoint Coverage by Namespace + +| Namespace | Supported | Total | Coverage | +|-----------|-----------|-------|----------| +| `com.atproto.repo` | 9 | 11 | 82% | +| `com.atproto.sync` | 6 | 11 | 55% | +| `com.atproto.server` | 7 | 26 | 27% | +| `com.atproto.identity` | 1 | 6 | 17% | +| `com.atproto.admin` | 0 | 14 | 0% (intentional) | +| `app.bsky.*` | 3 | - | Proxy model | + +## References + +- [AT Protocol Specs](https://atproto.com/specs) +- [Official PDS Implementation](https://github.com/bluesky-social/atproto/tree/main/packages/pds) +- [Account Migration Guide](https://atproto.com/guides/account-migration) diff --git a/plans/todo/migration-wizard.md b/plans/todo/migration-wizard.md new file mode 100644 index 00000000..ecdbf5d3 --- /dev/null +++ b/plans/todo/migration-wizard.md @@ -0,0 +1,688 @@ +# Migration Wizard - Gold Standard UX + +**Status:** 📋 Planning +**Priority:** P0 (Critical feature for user adoption) + +## Overview + +A one-command migration experience that enables users to migrate their Bluesky accounts to a self-hosted edge PDS with zero downtime, full data preservation, and the ability to test before committing. + +## Unique Advantages of Serverless + +**Traditional PDS Migration:** +- Downtime while switching servers +- Can't test before switching +- Expensive to run two servers in parallel +- Scary "point of no return" + +**Edge PDS Migration:** +- ✅ Deploy new PDS in seconds (just a Worker) +- ✅ Run old + new simultaneously (pennies in cost) +- ✅ Test thoroughly before switching +- ✅ Instant rollback if issues +- ✅ Zero downtime cutover (just update PLC) + +## User Experience Goal + +```bash +npx @ascorbic/pds migrate +``` + +One command. Everything automatic. Test mode before cutover. Zero risk. + +## Migration Flow Overview + +### Stage 1: Account Detection & Setup +- Auto-detect Bluesky account from local app data (macOS/Windows/Linux) +- Connect Cloudflare account (OAuth) +- Choose domain (owned domain or workers.dev) +- Show cost estimate (~$0.01/month) + +### Stage 2: Infrastructure Provisioning +- Create Worker + R2 bucket automatically +- Set up DNS records (if domain on Cloudflare) +- Generate signing keys and secrets +- Deploy to `-staging` subdomain + +### Stage 3: Data Migration +- Export CAR file from old PDS +- Download blobs (with progress bars, resumable) +- Import to staging PDS +- Validate all data present + +### Stage 4: Test Mode +- Staging PDS fully operational +- User can test extensively +- NO PLC update yet (safe to abandon) +- Automated validation suite +- Manual testing guide + +### Stage 5: Cutover +- Update PLC directory atomically +- Switch staging → production +- Update handle (if desired) +- Keep 24h rollback window + +## Detailed User Journeys + +### Journey A: Fresh Migration from Bluesky + +**User:** Alice (@alice.bsky.social) wants her own PDS + +``` +$ npx create-pds alice-pds +$ cd alice-pds +$ pnpm pds migrate + +🔍 Detecting your Bluesky account... + + Auto-detected: alice.bsky.social + DID: did:plc:abc123xyz + Current PDS: bsky.social + + 📊 Your account: + 142 posts • 23 images (2.4 MB) • 89 followers + +☁️ Connect Cloudflare account + → Opening browser for authentication... + ✓ Connected: alice@example.com + +🌐 Choose your PDS domain + → alice.com (recommended) ⭐ + Can use as your handle: @alice.com + +💰 Cost estimate: ~$0.01/month + +🔐 Setting up infrastructure... + ✓ Worker: alice-pds-staging + ✓ R2 bucket: alice-pds-blobs + ✓ DNS: alice.com → Worker + ✓ Signing keys generated + +📦 Exporting from bsky.social... + Repository (1.2 MB) ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ 100% + Media (23 files, 2.4 MB) ▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓▓ 100% + +📥 Importing to staging PDS... + ✓ All records imported + ✓ All media accessible + +🧪 TEST MODE: Your PDS is ready to test! + + Staging URL: https://alice-pds-staging.workers.dev + + Try it: + 1. API test: curl https://alice-pds-staging.workers.dev/xrpc/... + 2. Run tests: pnpm pds test + 3. Test with Bluesky app (debug mode) + + When ready: pnpm pds cutover + +--- + +$ pnpm pds cutover + +🚀 Ready to go live? + + This will: + ✓ Update PLC directory + ✓ Point did:plc:abc123 → alice.com + ✓ Update handle: @alice.bsky.social → @alice.com + + Continue? (y/N) y + +🔄 Updating identity... + ✓ PLC operation submitted + ✓ Verified propagation + ✓ Activated alice.com + +🎉 Migration complete! + Your PDS: https://alice.com + Your handle: @alice.com +``` + +### Journey B: Interrupted Migration + +**User:** Charlie's network died during blob download + +``` +$ pnpm pds migrate + +🔄 Found incomplete migration + + Progress: + ✓ Cloudflare setup + ✓ Repository export (1.2 MB) + ⏸ Media: 14/23 files downloaded + + Resume? (Y/n) y + +📦 Resuming download... + Media files ▓▓▓▓▓▓▓▓▓▓▓▓░░░░░░░░ 61% (14/23) + + [Continues normally from where it left off] +``` + +### Journey C: Rollback After Issues + +**User:** Diana found an issue after going live + +``` +$ pnpm pds rollback + +⚡ Rolling back to bsky.social + + ✓ Reverted PLC directory (1.2s) + ✓ Verified: you're back on bsky.social + + Your alice.com PDS still exists for debugging. +``` + +## Implementation Components + +### CLI Commands + +``` +pds migrate # Main migration wizard (interactive) +pds migrate status # Show current progress +pds migrate resume # Resume from checkpoint +pds cutover # Go live after testing +pds rollback # Emergency rollback (24h window) +pds test # Run validation suite +pds cleanup # Remove old PDS data +``` + +### State Management + +**Location:** `.pds/migration-state.json` + +```json +{ + "version": "1.0.0", + "migrationId": "mig_2024-01-15_abc123", + "currentStep": "import", + "status": "in_progress", + + "account": { + "did": "did:plc:abc123", + "handle": "alice.bsky.social", + "oldPdsUrl": "https://bsky.social" + }, + + "cloudflare": { + "accountId": "cf-account-123", + "domain": "alice.com", + "zoneId": "zone-456" + }, + + "resources": { + "stagingWorker": { + "name": "alice-pds-staging", + "url": "https://alice-pds-staging.workers.dev", + "created": true + } + }, + + "export": { + "completed": true, + "repo": { + "file": "repo.car", + "size": 1234567, + "downloaded": true + }, + "blobs": { + "total": 23, + "downloaded": 14, + "manifest": [...] + } + }, + + "cutover": { + "completed": false, + "rollbackWindowUntil": null + } +} +``` + +**Features:** +- Atomic writes (write to temp, rename) +- Encrypted auth tokens +- Checkpoint after each major step +- Enables resume from any point + +### Account Detection + +**Auto-detection strategy:** + +1. Check for Bluesky app session files: + - macOS: `~/Library/Application Support/xyz.blueskyweb.app/` + - Linux: `~/.config/xyz.blueskyweb.app/` + - Windows: `%APPDATA%\xyz.blueskyweb.app\` + +2. Parse session JSON for DID and tokens + +3. Fallback to manual entry if not found + +4. Validate by fetching account info from current PDS + +### Cloudflare Authentication + +**OAuth flow:** + +1. Check for existing credentials (env, wrangler config, project) +2. If none, initiate OAuth: + - Generate PKCE challenge + - Open browser to Cloudflare OAuth endpoint + - Start local HTTP server for callback + - Exchange code for token +3. Verify permissions (Workers, R2, DNS) + +### Domain Selection + +**Detection:** + +``` +GET /zones → List domains in account +``` + +**Presentation:** + +``` +Choose your PDS domain: + +→ alice.com (recommended) ⭐ + • Active on Cloudflare + • Can use as your handle: @alice.com + +→ example.com + • Alternative option + +→ Use Workers.dev subdomain + • Free: alice-pds.alice.workers.dev + • Cannot use as handle + +Which domain? (1) +``` + +**DNS automation:** +- If domain on Cloudflare: Create DNS records via API +- If external DNS: Provide instructions + +### Resource Provisioning + +**Worker:** +``` +POST /accounts/{account_id}/workers/scripts/{script_name} +``` + +**Naming:** +- Staging: `{project-name}-staging` +- Production: `{project-name}` + +**R2 Bucket:** +``` +POST /accounts/{account_id}/r2/buckets +{ "name": "{project-name}-blobs" } +``` + +**DNS (if Cloudflare domain):** +``` +POST /zones/{zone_id}/dns_records +[ + { type: "CNAME", name: domain, content: worker-url }, + { type: "TXT", name: "_atproto", content: "did=..." } +] +``` + +**Cost Estimation:** + +Before creating, show: + +``` +Monthly cost estimate: + +Workers (unlimited requests) Free +R2 Storage (2.4 MB) $0.00 +R2 Operations (~10k/mo) $0.01 +──────────────────────────────────── +Total ~$0.01/month + +99% of users stay on free tier! 🎉 +``` + +### Data Export + +**Repository:** +``` +GET /xrpc/com.atproto.sync.getRepo?did={did} +→ Save to .pds/cache/{did}/repo.car +``` + +**Blobs:** +``` +GET /xrpc/com.atproto.sync.listBlobs?did={did} +→ Get CID list + +For each CID: + GET /xrpc/com.atproto.sync.getBlob?did={did}&cid={cid} + → Save to .pds/cache/{did}/blobs/{cid} +``` + +**Parallel download:** +- Up to 5 blobs concurrently +- Resume support (track completed CIDs) +- Progress bars with speed and ETA + +**Cache manifest:** +```json +{ + "did": "did:plc:abc123", + "exportedAt": "2024-01-15T10:30:00Z", + "repo": { + "file": "repo.car", + "size": 1234567, + "sha256": "..." + }, + "blobs": [ + { + "cid": "bafyxxx", + "file": "blobs/bafyxxx", + "size": 124567, + "mimeType": "image/jpeg" + } + ] +} +``` + +### Data Import + +**Repository:** +``` +POST /xrpc/com.atproto.repo.importRepo +Content-Type: application/vnd.ipld.car +Authorization: Bearer {token} + +{CAR file bytes} +``` + +**Blobs:** +``` +POST /xrpc/com.atproto.repo.uploadBlob +Content-Type: {mime-type} +Authorization: Bearer {token} + +{blob bytes} +``` + +**Parallel upload:** +- Up to 3 blobs concurrently +- Track uploaded CIDs in state + +**Post-import validation:** +``` +1. describeRepo - verify collections +2. listRecords - count records per collection +3. getRecord - sample records +4. getBlob - sample blobs +5. Check firehose operational +``` + +### PLC Directory Operations + +**Current limitation:** Requires rotation keys + +**Email challenge flow (official):** +``` +1. GET /xrpc/com.atproto.identity.getRecommendedDidCredentials +2. POST /xrpc/com.atproto.identity.requestPlcOperationSignature + → Email sent with token +3. POST /xrpc/com.atproto.identity.signPlcOperation + → Sign with email token +4. POST /xrpc/com.atproto.identity.submitPlcOperation + → Submit to plc.directory +``` + +**Implementation:** +- Prompt user to check email for token +- Sign operation with token +- Submit to PLC directory +- Poll for propagation (up to 60s) + +### Automated Validation Suite + +**`pds test` command:** + +``` +Running PDS Tests +━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ + +Identity + ✓ DID document served + ✓ Handle resolves correctly + ✓ Keys match expected values + +Repository + ✓ describeRepo returns correct collections + ✓ Sample records accessible (5/5) + ✓ Record count matches export (142) + +Blobs + ✓ Blob storage configured + ✓ Sample blobs accessible (5/5) + ✓ All blob CIDs present (23/23) + +Federation + ✓ Firehose subscription works + ✓ Can receive commit events + +All tests passed! ✓ +``` + +### Manual Testing Guide + +**Provided after staging deployment:** + +``` +TEST MODE ACTIVE + +Your staging PDS: https://alice-pds-staging.workers.dev + +Try it: + +1. API test: + curl https://alice-pds-staging.workers.dev/xrpc/com.atproto.repo.describeRepo?repo=did:plc:abc123 + +2. Test with Bluesky app (safe - won't affect main account): + • Open Bluesky settings + • Advanced → Custom PDS (debug mode) + • Enter: https://alice-pds-staging.workers.dev + • Browse posts, test posting + • Switch back when done + +3. Run automated tests: + pnpm pds test + +Take your time. When ready: pnpm pds cutover +``` + +## Code Changes Needed + +### 1. Add Force Flag to importRepo + +**Location:** `packages/pds/src/account-do.ts` + +```typescript +async rpcImportRepo( + carBytes: Uint8Array, + force = false +): Promise<{ did: string; rev: string }> { + const existingRoot = await this.storage!.getRoot(); + + if (existingRoot && !force) { + throw new Error("Repository exists. Use force=true to overwrite."); + } + + if (force && existingRoot) { + // Wipe and reimport + await this.storage!.destroy(); + await this.ensureStorageInitialized(); + } + + // ... rest of import logic +} +``` + +### 2. Add Blob Migration Helper + +**Location:** `packages/pds/src/account-do.ts` + +```typescript +async rpcImportBlobs( + oldPdsUrl: string, + did: string +): Promise<{ imported: number; failed: string[] }> { + // List blobs from old PDS + const listUrl = `${oldPdsUrl}/xrpc/com.atproto.sync.listBlobs?did=${did}`; + const listRes = await fetch(listUrl); + const { cids } = await listRes.json(); + + const failed: string[] = []; + for (const cid of cids) { + try { + const blobUrl = `${oldPdsUrl}/xrpc/com.atproto.sync.getBlob?did=${did}&cid=${cid}`; + const blobRes = await fetch(blobUrl); + const bytes = new Uint8Array(await blobRes.arrayBuffer()); + const mimeType = blobRes.headers.get('content-type') || 'application/octet-stream'; + + await this.rpcUploadBlob(bytes, mimeType); + } catch (err) { + failed.push(cid); + } + } + + return { imported: cids.length - failed.length, failed }; +} +``` + +### 3. Add PLC Management Endpoints + +**New file:** `packages/pds/src/xrpc/identity.ts` + +Implement: +- `getRecommendedDidCredentials` +- `requestPlcOperationSignature` +- `signPlcOperation` +- `submitPlcOperation` + +Using `@atproto/identity` package + +### 4. Add Account Lifecycle Endpoints + +**Location:** `packages/pds/src/xrpc/server.ts` + +Implement: +- `createAccount` (deactivated state) +- `activateAccount` +- `deactivateAccount` +- `checkAccountStatus` (enhanced) + +### 5. Add listMissingBlobs Endpoint + +**Location:** `packages/pds/src/xrpc/repo.ts` + +```typescript +export async function listMissingBlobs(c: Context) { + const repo = c.req.query('repo'); + // Get all blob CIDs from records + // Check which ones are missing from R2 + // Return missing list +} +``` + +## Package Structure + +``` +packages/pds-migrate/ +├── src/ +│ ├── cli.ts # Main CLI entry +│ ├── commands/ +│ │ ├── migrate.ts # Main migration flow +│ │ ├── cutover.ts # Go live +│ │ ├── rollback.ts # Undo cutover +│ │ ├── test.ts # Validation suite +│ │ └── status.ts # Show progress +│ ├── steps/ +│ │ ├── detect-account.ts # Auto-detect Bluesky +│ │ ├── connect-cloudflare.ts # OAuth +│ │ ├── provision.ts # Create resources +│ │ ├── export.ts # Download from old PDS +│ │ ├── import.ts # Upload to new PDS +│ │ ├── validate.ts # Test everything +│ │ └── update-identity.ts # PLC operations +│ ├── lib/ +│ │ ├── cloudflare.ts # CF API wrapper +│ │ ├── atproto.ts # AT Protocol helpers +│ │ ├── plc.ts # PLC directory ops +│ │ ├── state.ts # Migration state +│ │ └── keys.ts # Crypto generation +│ └── ui/ +│ ├── prompts.ts # Interactive prompts +│ ├── progress.ts # Progress bars +│ └── errors.ts # Error formatting +└── package.json +``` + +## Non-Interactive Mode + +**For scripting:** + +```bash +pnpm pds migrate \ + --yes \ + --from alice.bsky.social \ + --to alice.com \ + --cf-account-id xxx \ + --cf-api-token yyy \ + --staging-only +``` + +**Exit codes:** +- 0 = Success +- 1 = User cancelled +- 2 = Validation failed +- 3 = Network error (retryable) +- 4 = Configuration error + +## Success Criteria + +1. ✅ One command migration (`pnpm pds migrate`) +2. ✅ Auto-detects Bluesky account +3. ✅ Provisions all infrastructure automatically +4. ✅ Exports and imports all data +5. ✅ Test mode before committing +6. ✅ Automated validation +7. ✅ Zero downtime cutover +8. ✅ 24-hour rollback window +9. ✅ Clear progress indicators +10. ✅ Resumable from any point + +## Timeline + +**Minimal (Fix Current Issues):** 1 day +- Force flag, blob migration, validation + +**Good (Smooth but Manual):** 3-4 days +- + PLC endpoints, detailed guide + +**Great (Turnkey Solution):** 2 weeks +- + CLI wizard, automation + +**Amazing (Best Migration UX):** 3-4 weeks +- + Everything above + polish + +## References + +- [Account Migration - AT Protocol](https://atproto.com/guides/account-migration) +- [Account Migration Details](https://github.com/bluesky-social/atproto/discussions/3176) +- [Bluesky PDS Migration Docs](https://github.com/bluesky-social/pds/blob/main/ACCOUNT_MIGRATION.md) +- [Enabling Migration Back to Bluesky](https://docs.bsky.app/blog/incoming-migration) diff --git a/plans/todo/oauth-provider.md b/plans/todo/oauth-provider.md new file mode 100644 index 00000000..3ba5abc8 --- /dev/null +++ b/plans/todo/oauth-provider.md @@ -0,0 +1,450 @@ +# OAuth Provider Implementation Plan + +**Status:** 📋 Planning +**Priority:** P0 (Critical for ecosystem compatibility) + +## Overview + +Implement OAuth 2.1 provider with AT Protocol extensions to enable "Login with Bluesky" / "Login with AT Protocol" ecosystem compatibility. + +## Why This Matters + +**Ecosystem Reality:** +- Third-party Bluesky apps use OAuth with PKCE +- Growing "AT Protocol apps" ecosystem requires OAuth +- Without OAuth support, edge PDS users can't use ecosystem apps +- Makes edge PDS deployments second-class citizens + +**Apps Using OAuth:** +- Third-party Bluesky clients (Skeets, Graysky, etc.) +- Analytics tools +- Cross-posting services +- Bot platforms +- Schedule/automation tools + +## Approach: Extend Cloudflare's OAuth Provider + +**Base Library:** [@cloudflare/workers-oauth-provider](https://github.com/cloudflare/workers-oauth-provider) + +**What it provides:** +- ✅ OAuth 2.1 with PKCE (S256 and plain) +- ✅ Dynamic client registration (RFC 7591) +- ✅ Metadata discovery (RFC 8414) +- ✅ Token refresh +- ✅ KV storage integration +- ✅ Workers-native architecture +- ✅ ~3,500 lines of tested code + +**What we need to add:** +- ❌ DPoP (Demonstrating Proof of Possession) - RFC 9449 +- ❌ PAR (Pushed Authorization Requests) - RFC 9126 +- ❌ DID-based client discovery +- ❌ Durable Object storage adapter +- ❌ AT Protocol scope handling + +## Extension Points Needed + +### 1. Storage Adapter Interface + +**Current:** Hardcoded to KV +**Need:** Pluggable storage backend + +```typescript +export interface OAuthStorage { + // Grants (refresh tokens + metadata) + saveGrant(grantId: string, data: GrantData, ttl: number): Promise; + getGrant(grantId: string): Promise; + deleteGrant(grantId: string): Promise; + + // Authorization codes (short-lived) + saveAuthCode(code: string, data: AuthCodeData, ttl: number): Promise; + getAuthCode(code: string): Promise; + deleteAuthCode(code: string): Promise; + + // PAR requests + savePAR(requestUri: string, data: PARData, ttl: number): Promise; + getPAR(requestUri: string): Promise; + + // Clients (if dynamic registration) + saveClient(clientId: string, data: ClientInfo): Promise; + getClient(clientId: string): Promise; + + // DPoP nonces (for replay prevention) + checkAndSetNonce(nonce: string, ttl: number): Promise; +} + +// KV implementation (current) +export class KVStorage implements OAuthStorage { ... } + +// Durable Object implementation (what we need) +export class DurableObjectStorage implements OAuthStorage { + // Uses DO SQL for transactions and complex queries +} +``` + +**Why:** AT Protocol needs SQL for complex queries, transactions, and multi-table operations + +### 2. Client Discovery Hook + +**Current:** Pre-registered clients or dynamic registration endpoint +**Need:** DID-based dynamic discovery + +```typescript +export interface ClientResolver { + resolveClient( + clientId: string, + options: { request: Request; env: any } + ): Promise; +} + +// Default (current behavior) +export class DefaultClientResolver implements ClientResolver { + // URL-based or pre-registered +} + +// AT Protocol DID-based +export class ATProtoClientResolver implements ClientResolver { + async resolveClient(clientId: string) { + // Client ID is a DID + // Resolve DID document + // Extract OAuth client metadata from DID document + } +} +``` + +**Why:** AT Protocol clients identified by DID, metadata in DID document + +### 3. DPoP Support (Standard OAuth 2.1) + +**What:** Token binding to prevent theft +**Status:** Not in Cloudflare provider +**Spec:** RFC 9449 + +```typescript +export interface DpopConfig { + required?: boolean; + algorithms?: string[]; // Default: ['ES256', 'RS256'] + nonceExpiration?: number; // Default: 300 +} + +async function verifyDpopProof( + request: Request, + accessToken: string | null, + config: DpopConfig, + storage: OAuthStorage +): Promise<{ valid: boolean; jkt: string }> { + // Parse DPoP header (JWT) + // Verify signature + // Check HTM (HTTP method) matches + // Check HTU (HTTP URI) matches + // Check ATH (access token hash) if token provided + // Check JTI unique (prevent replay) + // Return key thumbprint for binding +} +``` + +**Implementation:** +- Verify DPoP proof on token exchange +- Bind access token to key thumbprint +- Verify DPoP proof on every API request +- Return `token_type: 'DPoP'` instead of 'Bearer' + +### 4. PAR Support (Standard OAuth 2.1) + +**What:** More secure authorization (params not in URL) +**Status:** Not in Cloudflare provider +**Spec:** RFC 9126 + +```typescript +// New endpoint: POST /oauth/par +async handlePARRequest(request: Request, env: any): Promise { + // Parse request body (auth params) + // Authenticate client + // Generate request_uri + // Store params for 90 seconds + // Return { request_uri, expires_in: 90 } +} + +// Modified: GET /oauth/authorize +async handleAuthorizeRequest(request: Request) { + const requestUri = url.searchParams.get('request_uri'); + + if (requestUri) { + // Load params from PAR + // Verify client_id matches + // Delete PAR (one-time use) + } else { + // Traditional query parameters + } +} +``` + +### 5. Token Payload Customization + +**Current:** Fixed token structure +**Need:** Custom claims for AT Protocol + +```typescript +export interface TokenPayloadBuilder { + buildAccessToken( + grant: GrantData, + options: { clientId: string; scope: string[]; jkt?: string } + ): Promise; + + validateAccessToken( + payload: any, + options: { request: Request; requiredScope?: string[] } + ): Promise; +} + +// AT Protocol implementation +export class ATProtoTokenPayloadBuilder implements TokenPayloadBuilder { + async buildAccessToken(grant, options) { + return { + sub: grant.userId, // DID + client_id: options.clientId, // Client DID + scope: 'atproto', // Single scope for AT Protocol + cnf: options.jkt ? { jkt: options.jkt } : undefined, // DPoP binding + iat: ..., + exp: ..., + }; + } +} +``` + +### 6. Metadata Customization + +**Need:** AT Protocol-specific discovery metadata + +```typescript +export interface OAuthProviderOptions { + additionalMetadata?: { + token_endpoint_auth_methods_supported?: string[]; + dpop_signing_alg_values_supported?: string[]; + [key: string]: any; + }; +} + +// Discovery endpoint includes: +{ + "issuer": "https://your-pds.com", + "authorization_endpoint": "...", + "token_endpoint": "...", + "pushed_authorization_request_endpoint": "...", // If PAR enabled + "dpop_signing_alg_values_supported": ["ES256"], // If DPoP enabled + ...customMetadata +} +``` + +## Implementation Phases + +### Phase 1: Core OAuth (Week 1) +- Token endpoints (authorization code flow) +- Basic PKCE support +- Simple consent UI +- DO SQL storage adapter + +### Phase 2: AT Protocol Extensions (Week 2) +- DPoP verification +- PAR support +- DID-based client discovery +- Proper metadata endpoint + +### Phase 3: Polish (Optional) +- Better authorization UI +- Scope management +- Token revocation +- Edge cases + +## Migration Path + +```typescript +// Old way (still works) +new OAuthProvider({ + kv: env.KV, + defaultHandler: myHandler, +}); + +// New way with extensions +new OAuthProvider({ + storage: new DurableObjectStorage(env.OAUTH_DO), + clientResolver: new ATProtoClientResolver(didResolver), + dpop: { required: true, algorithms: ['ES256'] }, + tokenPayloadBuilder: new ATProtoTokenPayloadBuilder(), + enablePAR: true, + defaultHandler: myHandler, +}); +``` + +## Upstream Contributions + +**Value to Cloudflare OAuth Provider:** + +1. **DPoP support** - Standard OAuth 2.1 feature, benefits all users +2. **PAR support** - Standard OAuth 2.1 feature, improves security +3. **Storage adapter pattern** - Enables DO + SQL use cases + +**Contribution Strategy:** +1. Implement for AT Protocol first +2. Extract into clean, reusable modules +3. Submit PRs to Cloudflare provider +4. Benefit: Code maintained by Cloudflare team + +## Authorization UI Design + +**Simple but functional:** + +```html + + + + Authorize App + + + +
+ +

Authorize {client.name}?

+ +
+

This app wants to:

+
    +
  • Read your posts
  • +
  • Create new posts
  • +
+
+ +
+ + +
+ +

You can revoke access anytime in settings.

+
+ + +``` + +## Storage Schema + +```sql +-- OAuth clients (DID-based) +CREATE TABLE oauth_clients ( + client_id TEXT PRIMARY KEY, -- DID + client_name TEXT, + client_uri TEXT, + logo_uri TEXT, + redirect_uris TEXT, -- JSON array + last_seen INTEGER +); + +-- Authorization codes (short-lived, 5 min) +CREATE TABLE oauth_codes ( + code TEXT PRIMARY KEY, + client_id TEXT, + redirect_uri TEXT, + scope TEXT, + code_challenge TEXT, + code_challenge_method TEXT, + expires_at INTEGER, + used INTEGER DEFAULT 0 +); + +-- Access tokens (DPoP-bound) +CREATE TABLE oauth_tokens ( + token TEXT PRIMARY KEY, + refresh_token TEXT, + client_id TEXT, + scope TEXT, + dpop_jkt TEXT, -- DPoP key thumbprint + issued_at INTEGER, + expires_at INTEGER, + revoked INTEGER DEFAULT 0 +); + +-- PAR requests (short-lived, 90 sec) +CREATE TABLE oauth_par ( + request_uri TEXT PRIMARY KEY, + client_id TEXT, + params TEXT, -- JSON blob + expires_at INTEGER +); + +-- DPoP nonces (replay prevention) +CREATE TABLE oauth_nonces ( + nonce TEXT PRIMARY KEY, + expires_at INTEGER +); +``` + +## Security Considerations + +### DPoP Implementation +- JWT signature verification with JWK from proof +- HTM/HTU matching (prevent cross-site attacks) +- JTI uniqueness (prevent replay) +- ATH verification (token binding) +- Key thumbprint persistence + +### PAR Implementation +- Request URI one-time use +- 90-second expiration (RFC recommendation) +- Client authentication required +- Parameters encrypted in storage + +### Token Security +- Short-lived access tokens (60 min) +- Long-lived refresh tokens (90 days) +- Refresh token rotation on use +- DPoP binding prevents theft +- Revocation support + +## Testing Strategy + +### Unit Tests +- DPoP proof verification +- PAR request handling +- Token generation/validation +- Storage adapter operations + +### Integration Tests +- Full OAuth flow with PKCE +- DID-based client discovery +- Token refresh +- Revocation + +### Ecosystem Tests +- Test with real Bluesky apps +- Verify "Login with Bluesky" works +- Test multi-device scenarios +- Validate spec compliance + +## Timeline + +**Total Effort:** 2 weeks focused work + +- Storage adapter: 2 days +- DPoP implementation: 2-3 days +- PAR implementation: 1-2 days +- Client discovery: 1 day +- Authorization UI: 1 day +- Testing: 2-3 days +- Documentation: 1 day + +## Success Criteria + +1. ✅ Users can login to third-party apps with "Login with Bluesky" +2. ✅ OAuth flow fully spec-compliant (DPoP, PAR, PKCE) +3. ✅ DID-based client discovery works +4. ✅ Tokens are DPoP-bound and secure +5. ✅ Compatible with Cloudflare provider architecture +6. ✅ Passes ecosystem integration tests + +## References + +- [AT Protocol OAuth Spec](https://atproto.com/specs/oauth) +- [OAuth 2.1 Draft](https://datatracker.ietf.org/doc/html/draft-ietf-oauth-v2-1) +- [RFC 9449: DPoP](https://www.rfc-editor.org/rfc/rfc9449.html) +- [RFC 9126: PAR](https://www.rfc-editor.org/rfc/rfc9126.html) +- [Cloudflare OAuth Provider](https://github.com/cloudflare/workers-oauth-provider) +- [AT Protocol OAuth Issues](https://github.com/bluesky-social/atproto/issues/3292)