Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion demos/pds/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,13 @@
"DID": "did:web:pds.mk.gg",
// Account handle (e.g., "alice.example.com")
"HANDLE": "pds.mk.gg"
}
},
// Secrets (set via `pds init` or `pds secret <name>`):
// - AUTH_TOKEN: Bearer token for API write operations
// - SIGNING_KEY: Private signing key (secp256k1 JWK)
// - JWT_SECRET: Secret for signing session JWTs
// - PASSWORD_HASH: Bcrypt hash of account password (for Bluesky app login)
"observability": {
"enabled": true
}
}
2 changes: 1 addition & 1 deletion packages/create-pds/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ npm run dev

Your PDS will be running at http://localhost:5173

See the [@ascorbic/pds documentation](https://github.com/ascorbic/atproto-worker/tree/main/packages/pds) for configuration and deployment instructions.
See the [@ascorbic/pds documentation](https://github.com/ascorbic/atproto-worker/tree/main/packages/pds) for configuration and deployment instructions.
1 change: 1 addition & 0 deletions packages/pds/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"dependencies": {
"@atproto/common-web": "^0.4.7",
"@atproto/crypto": "^0.4.5",
"@atproto/identity": "^0.4.10",
"@atproto/lex-cbor": "^0.0.3",
"@atproto/lex-data": "^0.0.3",
"@atproto/lexicon": "^0.6.0",
Expand Down
95 changes: 95 additions & 0 deletions packages/pds/src/did-cache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/**
* DID cache using Cloudflare Workers Cache API
*/

import type { DidCache, CacheResult, DidDocument } from "@atproto/identity";
import { check, didDocument } from "@atproto/common-web";
import { waitUntil } from "cloudflare:workers";

const STALE_TTL = 60 * 60 * 1000; // 1 hour - serve from cache but refresh in background
const MAX_TTL = 24 * 60 * 60 * 1000; // 24 hours - must refresh

export class WorkersDidCache implements DidCache {
private cache: Cache;

constructor() {
this.cache = caches.default;
}

private getCacheKey(did: string): string {
// Use a stable URL format for cache keys
return `https://did-cache.internal/${encodeURIComponent(did)}`;
}

async cacheDid(
did: string,
doc: DidDocument,
_prevResult?: CacheResult,
): Promise<void> {
const cacheKey = this.getCacheKey(did);
const response = new Response(JSON.stringify(doc), {
headers: {
"Content-Type": "application/json",
"Cache-Control": "max-age=86400", // 24 hours
"X-Cached-At": Date.now().toString(),
},
});

await this.cache.put(cacheKey, response);
}

async checkCache(did: string): Promise<CacheResult | null> {
const cacheKey = this.getCacheKey(did);
const response = await this.cache.match(cacheKey);

if (!response) {
return null;
}

const cachedAt = parseInt(response.headers.get("X-Cached-At") || "0", 10);
const now = Date.now();
const age = now - cachedAt;

const doc = await response.json();

// Validate cached document schema
if (!check.is(doc, didDocument) || doc.id !== did) {
await this.clearEntry(did);
return null;
}

return {
did,
doc,
updatedAt: cachedAt,
stale: age > STALE_TTL,
expired: age > MAX_TTL,
};
}

async refreshCache(
did: string,
getDoc: () => Promise<DidDocument | null>,
_prevResult?: CacheResult,
): Promise<void> {
// Background refresh using waitUntil to ensure it completes after response
waitUntil(
getDoc().then((doc) => {
if (doc) {
return this.cacheDid(did, doc);
}
}),
);
}

async clearEntry(did: string): Promise<void> {
const cacheKey = this.getCacheKey(did);
await this.cache.delete(cacheKey);
}

async clear(): Promise<void> {
// Cache API doesn't have a clear-all method
// Would need to track keys separately if needed
// For now, entries will expire naturally
}
}
154 changes: 154 additions & 0 deletions packages/pds/src/did-resolver.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/**
* DID resolution for Cloudflare Workers
*
* We can't use @atproto/identity directly because it uses `redirect: "error"`
* which Cloudflare Workers doesn't support. This is a simple implementation
* that's compatible with Workers.
*/

import { check, didDocument, type DidDocument } from "@atproto/common-web";
import type { DidCache } from "@atproto/identity";

const PLC_DIRECTORY = "https://plc.directory";
const TIMEOUT_MS = 3000;

export interface DidResolverOpts {
plcUrl?: string;
timeout?: number;
didCache?: DidCache;
}

export class DidResolver {
private plcUrl: string;
private timeout: number;
private cache?: DidCache;

constructor(opts: DidResolverOpts = {}) {
this.plcUrl = opts.plcUrl ?? PLC_DIRECTORY;
this.timeout = opts.timeout ?? TIMEOUT_MS;
this.cache = opts.didCache;
}

async resolve(did: string): Promise<DidDocument | null> {
// Check cache first
if (this.cache) {
const cached = await this.cache.checkCache(did);
if (cached && !cached.expired) {
// Trigger background refresh if stale
if (cached.stale) {
this.cache.refreshCache(did, () => this.resolveNoCache(did), cached);
}
return cached.doc;
}
}

const doc = await this.resolveNoCache(did);

// Update cache
if (doc && this.cache) {
await this.cache.cacheDid(did, doc);
} else if (!doc && this.cache) {
await this.cache.clearEntry(did);
}

return doc;
}

private async resolveNoCache(did: string): Promise<DidDocument | null> {
if (did.startsWith("did:web:")) {
return this.resolveDidWeb(did);
}
if (did.startsWith("did:plc:")) {
return this.resolveDidPlc(did);
}
throw new Error(`Unsupported DID method: ${did}`);
}

private async resolveDidWeb(did: string): Promise<DidDocument | null> {
const parts = did.split(":").slice(2);
if (parts.length === 0) {
throw new Error(`Invalid did:web format: ${did}`);
}

// Only support simple did:web without paths (like @atproto/identity)
if (parts.length > 1) {
throw new Error(`Unsupported did:web with path: ${did}`);
}

const domain = decodeURIComponent(parts[0]!);
const url = new URL(`https://${domain}/.well-known/did.json`);

// Use http for localhost
if (url.hostname === "localhost") {
url.protocol = "http:";
}

const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);

try {
const res = await fetch(url.toString(), {
signal: controller.signal,
redirect: "manual", // Workers doesn't support "error"
headers: { accept: "application/did+ld+json,application/json" },
});

// Check for redirect (we don't follow them for security)
if (res.status >= 300 && res.status < 400) {
return null;
}

if (!res.ok) {
return null;
}

const doc = await res.json();
return this.validateDidDoc(did, doc);
} finally {
clearTimeout(timeoutId);
}
}

private async resolveDidPlc(did: string): Promise<DidDocument | null> {
const url = new URL(`/${encodeURIComponent(did)}`, this.plcUrl);

const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), this.timeout);

try {
const res = await fetch(url.toString(), {
signal: controller.signal,
redirect: "manual", // Workers doesn't support "error"
headers: { accept: "application/did+ld+json,application/json" },
});

// Check for redirect (we don't follow them for security)
if (res.status >= 300 && res.status < 400) {
return null;
}

if (res.status === 404) {
return null;
}

if (!res.ok) {
throw new Error(`PLC directory error: ${res.status} ${res.statusText}`);
}

const doc = (await res.json()) as DidDocument;
return this.validateDidDoc(did, doc);
} finally {
clearTimeout(timeoutId);
}
}

private validateDidDoc(did: string, doc: unknown): DidDocument | null {
if (!check.is(doc, didDocument)) {
return null;
}
if (doc.id !== did) {
return null;
}
return doc;
}
}
88 changes: 11 additions & 77 deletions packages/pds/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ import { env as _env } from "cloudflare:workers";
import { Secp256k1Keypair } from "@atproto/crypto";
import { ensureValidDid, ensureValidHandle } from "@atproto/syntax";
import { requireAuth } from "./middleware/auth";
import { createServiceJwt } from "./service-auth";
import { verifyAccessToken } from "./session";
import { DidResolver } from "./did-resolver";
import { WorkersDidCache } from "./did-cache";
import { handleXrpcProxy } from "./xrpc-proxy";
import * as sync from "./xrpc/sync";
import * as repo from "./xrpc/repo";
import * as server from "./xrpc/server";
Expand Down Expand Up @@ -48,9 +49,11 @@ try {
);
}

// Bluesky service DIDs for service auth
const APPVIEW_DID = "did:web:api.bsky.app";
const CHAT_DID = "did:web:api.bsky.chat";
const didResolver = new DidResolver({
didCache: new WorkersDidCache(),
timeout: 3000, // 3 second timeout for DID resolution
plcUrl: "https://plc.directory",
});

// Lazy-loaded keypair for service auth
let keypairPromise: Promise<Secp256k1Keypair> | null = null;
Expand Down Expand Up @@ -252,77 +255,8 @@ app.post("/admin/emit-identity", requireAuth, async (c) => {
return c.json(result);
});

// Proxy unhandled XRPC requests to Bluesky services
app.all("/xrpc/*", async (c) => {
const url = new URL(c.req.url);
url.protocol = "https:";

// Extract XRPC method name from path (e.g., "app.bsky.feed.getTimeline")
const lxm = url.pathname.replace("/xrpc/", "");

// Route to appropriate service based on lexicon namespace
const isChat = lxm.startsWith("chat.bsky.");
url.host = isChat ? "api.bsky.chat" : "api.bsky.app";
const audienceDid = isChat ? CHAT_DID : APPVIEW_DID;

// Check for authorization header
const auth = c.req.header("Authorization");
let headers: Record<string, string> = {};

if (auth?.startsWith("Bearer ")) {
const token = auth.slice(7);
const serviceDid = `did:web:${c.env.PDS_HOSTNAME}`;

// Try to verify the token - if valid, create a service JWT
try {
// Check static token first
let userDid: string;
if (token === c.env.AUTH_TOKEN) {
userDid = c.env.DID;
} else {
// Verify JWT
const payload = await verifyAccessToken(
token,
c.env.JWT_SECRET,
serviceDid,
);
userDid = payload.sub;
}

// Create service JWT for target service
const keypair = await getKeypair();
const serviceJwt = await createServiceJwt({
iss: userDid,
aud: audienceDid,
lxm,
keypair,
});
headers["Authorization"] = `Bearer ${serviceJwt}`;
} catch {
// Token verification failed - forward without auth
// Target service will return appropriate error
}
}

// Forward request with potentially replaced auth header
// Remove original authorization header to prevent conflicts
const originalHeaders = Object.fromEntries(c.req.raw.headers);
delete originalHeaders["authorization"];

const reqInit: RequestInit = {
method: c.req.method,
headers: {
...originalHeaders,
...headers,
},
};

// Include body for non-GET requests
if (c.req.method !== "GET" && c.req.method !== "HEAD") {
reqInit.body = c.req.raw.body;
}

return fetch(url.toString(), reqInit);
});
// Proxy unhandled XRPC requests to services specified via atproto-proxy header
// or fall back to Bluesky services for backward compatibility
app.all("/xrpc/*", (c) => handleXrpcProxy(c, didResolver, getKeypair));

export default app;
Loading