diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0a9e1bfa..d28b2964 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -61,6 +61,7 @@ jobs: early-return empty hello-world + langsmith mutex nestjs-exchange-rates sleep-for-days diff --git a/.scripts/list-of-samples.json b/.scripts/list-of-samples.json index 80d55b91..3dc56c38 100644 --- a/.scripts/list-of-samples.json +++ b/.scripts/list-of-samples.json @@ -25,6 +25,7 @@ "hello-world-mtls", "interceptors-opentelemetry", "lambda-worker", + "langsmith", "monorepo-folders", "mutex", "nestjs-exchange-rates", diff --git a/README.md b/README.md index 571103cc..4b12fad9 100644 --- a/README.md +++ b/README.md @@ -155,6 +155,7 @@ and you'll be given the list of sample options. - [**OpenTelemetry**](./interceptors-opentelemetry): Use the Interceptors feature to add OpenTelemetry metrics reporting to your workflows. - [**Query Subscriptions**](./query-subscriptions): Use Redis Streams, Immer, and SDK Interceptors to subscribe to Workflow state. - [**gRPC calls**](./grpc-calls): Make raw gRPC calls for advanced queries not covered by the WorkflowClient API. +- [**LangSmith**](./langsmith): Trace Workflows and Activities to [LangSmith](https://www.langchain.com/langsmith) using the `@temporalio/langsmith` plugin. #### Test APIs diff --git a/langsmith/.eslintignore b/langsmith/.eslintignore new file mode 100644 index 00000000..699794d0 --- /dev/null +++ b/langsmith/.eslintignore @@ -0,0 +1,3 @@ +node_modules +lib +.eslintrc.js diff --git a/langsmith/.eslintrc.js b/langsmith/.eslintrc.js new file mode 100644 index 00000000..b8251a06 --- /dev/null +++ b/langsmith/.eslintrc.js @@ -0,0 +1,48 @@ +const { builtinModules } = require('module'); + +const ALLOWED_NODE_BUILTINS = new Set(['assert']); + +module.exports = { + root: true, + parser: '@typescript-eslint/parser', + parserOptions: { + project: './tsconfig.json', + tsconfigRootDir: __dirname, + }, + plugins: ['@typescript-eslint', 'deprecation'], + extends: [ + 'eslint:recommended', + 'plugin:@typescript-eslint/eslint-recommended', + 'plugin:@typescript-eslint/recommended', + 'prettier', + ], + rules: { + // recommended for safety + '@typescript-eslint/no-floating-promises': 'error', // forgetting to await Activities and Workflow APIs is bad + 'deprecation/deprecation': 'warn', + + // code style preference + 'object-shorthand': ['error', 'always'], + + // relaxed rules, for convenience + '@typescript-eslint/no-unused-vars': [ + 'warn', + { + argsIgnorePattern: '^_', + varsIgnorePattern: '^_', + }, + ], + '@typescript-eslint/no-explicit-any': 'off', + }, + overrides: [ + { + files: ['src/workflows.ts', 'src/workflows-*.ts', 'src/workflows/*.ts'], + rules: { + 'no-restricted-imports': [ + 'error', + ...builtinModules.filter((m) => !ALLOWED_NODE_BUILTINS.has(m)).flatMap((m) => [m, `node:${m}`]), + ], + }, + }, + ], +}; diff --git a/langsmith/.gitignore b/langsmith/.gitignore new file mode 100644 index 00000000..3063f07d --- /dev/null +++ b/langsmith/.gitignore @@ -0,0 +1,2 @@ +lib +node_modules diff --git a/langsmith/.npmrc b/langsmith/.npmrc new file mode 100644 index 00000000..43c97e71 --- /dev/null +++ b/langsmith/.npmrc @@ -0,0 +1 @@ +package-lock=false diff --git a/langsmith/.nvmrc b/langsmith/.nvmrc new file mode 100644 index 00000000..2bd5a0a9 --- /dev/null +++ b/langsmith/.nvmrc @@ -0,0 +1 @@ +22 diff --git a/langsmith/.post-create b/langsmith/.post-create new file mode 100644 index 00000000..ee569a08 --- /dev/null +++ b/langsmith/.post-create @@ -0,0 +1,20 @@ +To begin development, install the Temporal CLI: + +Mac: {cyan brew install temporal} +Other: Download and extract the latest release from https://github.com/temporalio/cli/releases/latest + +Start Temporal Server: + +{cyan temporal server start-dev} + +Use Node version 18+ (v22.x is recommended): + +Mac: {cyan brew install node@22} +Other: https://nodejs.org/en/download/ + +To see live traces in LangSmith, enable tracing: + +{cyan export LANGSMITH_TRACING=true} +{cyan export LANGSMITH_API_KEY=...} + +Then run a scenario by path (see each scenario's README). diff --git a/langsmith/.prettierignore b/langsmith/.prettierignore new file mode 100644 index 00000000..a65b4177 --- /dev/null +++ b/langsmith/.prettierignore @@ -0,0 +1 @@ +lib diff --git a/langsmith/.prettierrc b/langsmith/.prettierrc new file mode 100644 index 00000000..965d50bf --- /dev/null +++ b/langsmith/.prettierrc @@ -0,0 +1,2 @@ +printWidth: 120 +singleQuote: true diff --git a/langsmith/README.md b/langsmith/README.md new file mode 100644 index 00000000..88cd00e7 --- /dev/null +++ b/langsmith/README.md @@ -0,0 +1,31 @@ +# LangSmith Tracing for Temporal + +These samples use the `@temporalio/langsmith` integration to add [LangSmith](https://docs.smith.langchain.com/) tracing to Temporal Workflows and Activities. Code you already instrument with LangSmith's native `traceable` keeps working unchanged when it runs inside a Workflow or Activity body — you only add the plugin to your Temporal Client and Worker, and the plugin threads a single trace across the `workflow → activity → child-workflow` boundaries. + +This is a single project: one `package.json` and one set of configs at the `langsmith/` root, with each scenario in its own subdirectory. Run `npm install` once here, then run any scenario by path (see each scenario's README). The integration package itself is documented in the [`@temporalio/langsmith` README](https://github.com/temporalio/sdk-typescript/tree/main/contrib/langsmith). + +## Prerequisites + +These apply to every sample in this directory: + +- A running Temporal dev server: `temporal server start-dev`. +- Node 22 or later. +- Dependencies installed once at the `langsmith/` root: `npm install`. + +Tracing is **off by default**, matching the `langsmith` library. To see live traces in LangSmith, enable it in the Worker and Client process environment: + +```bash +export LANGSMITH_TRACING=true +export LANGSMITH_API_KEY=... +``` + +With tracing off the plugin is a no-op. The tests enable it in-process and assert against an in-memory LangSmith Client, so they need no API key. + +## Samples + +| Sample | Demonstrates | +| :-------------------------------------- | :------------------------------------------------------------------------------------ | +| [`activity-tracing`](./activity-tracing) | A `traceable` model call inside an Activity, nested under the Workflow and Activity runs. | +| [`workflow-tracing`](./workflow-tracing) | Replay-safe `traceable` calls in a Workflow body — emitted once, never duplicated on replay. | +| [`agent-pipeline`](./agent-pipeline) | A multi-step agent whose trace threads through Activities and a child Workflow. | +| [`message-handlers`](./message-handlers) | `traceable` calls inside Signal and Update handlers, nested under each handler's run. | diff --git a/langsmith/activity-tracing/README.md b/langsmith/activity-tracing/README.md new file mode 100644 index 00000000..7dc83bcc --- /dev/null +++ b/langsmith/activity-tracing/README.md @@ -0,0 +1,34 @@ +# Activity Tracing + +A `traceable` model call runs inside an Activity body. The Client starts the Workflow from inside its own `traceable` (`user_pipeline`), so the whole call nests under one trace. With `addTemporalRuns: true` the plugin also emits the Temporal scaffolding runs (`StartWorkflow:`, `RunActivity:`, …). + +## Run + +Run these from the `langsmith/` root (run `npm install` there once first). To see live traces, `export LANGSMITH_TRACING=true` and `export LANGSMITH_API_KEY=...` in each terminal. + +```bash +# In one terminal, start the Worker (requires a local Temporal server): +npx ts-node activity-tracing/src/worker.ts + +# In another terminal, run the scenario: +npx ts-node activity-tracing/src/client.ts +``` + +## Test + +```bash +npx mocha --exit --require ts-node/register --require source-map-support/register "activity-tracing/src/mocha/*.test.ts" +``` + +The test runs a real Worker against `TestWorkflowEnvironment` with an in-memory LangSmith Client, so no API key is required. + +## Expected trace + +``` +user_pipeline + StartWorkflow:GreetingWorkflow + RunWorkflow:GreetingWorkflow + StartActivity:answer + RunActivity:answer + inner_llm_call +``` diff --git a/langsmith/activity-tracing/src/activities.ts b/langsmith/activity-tracing/src/activities.ts new file mode 100644 index 00000000..7c0e21b3 --- /dev/null +++ b/langsmith/activity-tracing/src/activities.ts @@ -0,0 +1,12 @@ +import { traceable } from 'langsmith/traceable'; + +const callModel = traceable( + async (prompt: string): Promise => { + return `answer to: ${prompt}`; + }, + { name: 'inner_llm_call' } +); + +export async function answer(prompt: string): Promise { + return callModel(prompt); +} diff --git a/langsmith/activity-tracing/src/client.ts b/langsmith/activity-tracing/src/client.ts new file mode 100644 index 00000000..dbb92a48 --- /dev/null +++ b/langsmith/activity-tracing/src/client.ts @@ -0,0 +1,32 @@ +import { Connection, Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { traceable } from 'langsmith/traceable'; +import { nanoid } from 'nanoid'; +import { GreetingWorkflow } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + const client = new Client({ connection, plugins: [plugin] }); + + const pipeline = traceable( + async () => { + return client.workflow.execute(GreetingWorkflow, { + taskQueue: 'langsmith-activity-tracing', + workflowId: 'langsmith-activity-tracing-' + nanoid(), + args: ['hello'], + }); + }, + { name: 'user_pipeline' } + ); + + const result = await pipeline(); + console.log(result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/activity-tracing/src/mocha/workflows.test.ts b/langsmith/activity-tracing/src/mocha/workflows.test.ts new file mode 100644 index 00000000..4c303783 --- /dev/null +++ b/langsmith/activity-tracing/src/mocha/workflows.test.ts @@ -0,0 +1,110 @@ +// Tracing is off by default; enable it before the plugin is constructed. +process.env.LANGSMITH_TRACING = 'true'; + +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { Worker } from '@temporalio/worker'; +import { Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { after, before, describe, it } from 'mocha'; +import assert from 'assert'; +import * as activities from '../activities'; +import { GreetingWorkflow } from '../workflows'; + +interface CollectedRun { + id: string; + name: string; + parent_run_id?: string; +} + +class InMemoryRunCollector { + readonly createOrder: string[] = []; + readonly byId = new Map(); + + createRun = async (run: Record): Promise => { + const id = String(run.id); + if (!this.byId.has(id)) { + this.createOrder.push(id); + this.byId.set(id, { id, name: String(run.name) }); + } + this.byId.set(id, { ...this.byId.get(id)!, ...(run as Partial), id }); + }; + + updateRun = async (id: string, update: Record): Promise => { + const existing = this.byId.get(id); + if (existing) { + this.byId.set(id, { ...existing, ...(update as Partial), id }); + } + }; + + awaitPendingTraceBatches = async (): Promise => {}; + + byName(name: string): CollectedRun | undefined { + for (const id of this.createOrder) { + const run = this.byId.get(id)!; + if (run.name === name) { + return run; + } + } + return undefined; + } + + parentNameOf(name: string): string | undefined { + const run = this.byName(name); + if (!run?.parent_run_id) { + return undefined; + } + return this.byId.get(run.parent_run_id)?.name; + } + + asClient(): LangSmithClient { + return this as unknown as LangSmithClient; + } +} + +describe('langsmith/activity-tracing', function () { + this.timeout(30_000); + + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createLocal(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + it('traces inner_llm_call under the activity run, nested below the workflow', async () => { + const collector = new InMemoryRunCollector(); + const plugin = new LangSmithPlugin({ client: collector.asClient(), addTemporalRuns: true }); + const taskQueue = 'test-langsmith-activity-tracing'; + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue, + workflowsPath: require.resolve('../workflows'), + activities, + plugins: [plugin], + }); + + const client = new Client({ + connection: testEnv.connection, + namespace: testEnv.namespace, + plugins: [plugin], + }); + + const result = await worker.runUntil( + client.workflow.execute(GreetingWorkflow, { + args: ['hello'], + workflowId: 'test-langsmith-activity-tracing-' + Date.now(), + taskQueue, + }) + ); + + assert.strictEqual(result, 'answer to: hello'); + + assert.strictEqual(collector.parentNameOf('inner_llm_call'), 'RunActivity:answer'); + assert.strictEqual(collector.parentNameOf('RunActivity:answer'), 'RunWorkflow:GreetingWorkflow'); + }); +}); diff --git a/langsmith/activity-tracing/src/worker.ts b/langsmith/activity-tracing/src/worker.ts new file mode 100644 index 00000000..e7a33322 --- /dev/null +++ b/langsmith/activity-tracing/src/worker.ts @@ -0,0 +1,28 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import * as activities from './activities'; + +async function run() { + const connection = await NativeConnection.connect({ address: 'localhost:7233' }); + try { + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + + const worker = await Worker.create({ + connection, + taskQueue: 'langsmith-activity-tracing', + workflowsPath: require.resolve('./workflows'), + activities, + plugins: [plugin], + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/activity-tracing/src/workflows.ts b/langsmith/activity-tracing/src/workflows.ts new file mode 100644 index 00000000..87718719 --- /dev/null +++ b/langsmith/activity-tracing/src/workflows.ts @@ -0,0 +1,10 @@ +import { proxyActivities } from '@temporalio/workflow'; +import type * as activities from './activities'; + +const { answer } = proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +export async function GreetingWorkflow(prompt: string): Promise { + return answer(prompt); +} diff --git a/langsmith/agent-pipeline/README.md b/langsmith/agent-pipeline/README.md new file mode 100644 index 00000000..e45dcbaa --- /dev/null +++ b/langsmith/agent-pipeline/README.md @@ -0,0 +1,42 @@ +# Agent Pipeline + +A multi-step research agent. The parent `ResearchWorkflow` calls two Activities (`gatherFacts`, `writeReport`) in sequence and then runs a child Workflow (`ReviewWorkflow`) for the review step. Each Activity wraps a `traceable` model call. The trace started on the Client (`research_pipeline`) threads across every boundary — Workflow, Activity, and child Workflow — so the runs nest the way you expect instead of fragmenting into disconnected roots. `addTemporalRuns: true` makes the Temporal scaffolding runs visible. + +## Run + +Run these from the `langsmith/` root (run `npm install` there once first). To see live traces, `export LANGSMITH_TRACING=true` and `export LANGSMITH_API_KEY=...` in each terminal. + +```bash +# In one terminal, start the Worker (requires a local Temporal server): +npx ts-node agent-pipeline/src/worker.ts + +# In another terminal, run the scenario: +npx ts-node agent-pipeline/src/client.ts +``` + +## Test + +```bash +npx mocha --exit --require ts-node/register --require source-map-support/register "agent-pipeline/src/mocha/*.test.ts" +``` + +The test runs a real Worker against `TestWorkflowEnvironment` with an in-memory LangSmith Client, so no API key is required. + +## Expected trace + +``` +research_pipeline + StartWorkflow:ResearchWorkflow + RunWorkflow:ResearchWorkflow + StartActivity:gatherFacts + RunActivity:gatherFacts + gather_llm_call + StartActivity:writeReport + RunActivity:writeReport + write_llm_call + StartChildWorkflow:ReviewWorkflow + RunWorkflow:ReviewWorkflow + StartActivity:reviewReport + RunActivity:reviewReport + review_llm_call +``` diff --git a/langsmith/agent-pipeline/src/activities.ts b/langsmith/agent-pipeline/src/activities.ts new file mode 100644 index 00000000..f1edb7ab --- /dev/null +++ b/langsmith/agent-pipeline/src/activities.ts @@ -0,0 +1,25 @@ +import { traceable } from 'langsmith/traceable'; + +const runGatherModel = traceable(async (topic: string): Promise => `facts about ${topic}`, { + name: 'gather_llm_call', +}); + +const runWriteModel = traceable(async (facts: string): Promise => `report based on: ${facts}`, { + name: 'write_llm_call', +}); + +const runReviewModel = traceable(async (report: string): Promise => `reviewed: ${report}`, { + name: 'review_llm_call', +}); + +export async function gatherFacts(topic: string): Promise { + return runGatherModel(topic); +} + +export async function writeReport(facts: string): Promise { + return runWriteModel(facts); +} + +export async function reviewReport(report: string): Promise { + return runReviewModel(report); +} diff --git a/langsmith/agent-pipeline/src/client.ts b/langsmith/agent-pipeline/src/client.ts new file mode 100644 index 00000000..2c537e30 --- /dev/null +++ b/langsmith/agent-pipeline/src/client.ts @@ -0,0 +1,32 @@ +import { Connection, Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { traceable } from 'langsmith/traceable'; +import { nanoid } from 'nanoid'; +import { ResearchWorkflow } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + const client = new Client({ connection, plugins: [plugin] }); + + const pipeline = traceable( + async () => { + return client.workflow.execute(ResearchWorkflow, { + taskQueue: 'langsmith-agent-pipeline', + workflowId: 'langsmith-agent-pipeline-' + nanoid(), + args: ['durable execution'], + }); + }, + { name: 'research_pipeline' } + ); + + const result = await pipeline(); + console.log(result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/agent-pipeline/src/mocha/workflows.test.ts b/langsmith/agent-pipeline/src/mocha/workflows.test.ts new file mode 100644 index 00000000..b42f76d1 --- /dev/null +++ b/langsmith/agent-pipeline/src/mocha/workflows.test.ts @@ -0,0 +1,120 @@ +// Tracing is off by default; enable it before the plugin is constructed. +process.env.LANGSMITH_TRACING = 'true'; + +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { Worker } from '@temporalio/worker'; +import { Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { traceable } from 'langsmith/traceable'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { after, before, describe, it } from 'mocha'; +import assert from 'assert'; +import * as activities from '../activities'; +import { ResearchWorkflow } from '../workflows'; + +interface CollectedRun { + id: string; + name: string; + parent_run_id?: string; +} + +class InMemoryRunCollector { + readonly createOrder: string[] = []; + readonly byId = new Map(); + + createRun = async (run: Record): Promise => { + const id = String(run.id); + if (!this.byId.has(id)) { + this.createOrder.push(id); + this.byId.set(id, { id, name: String(run.name) }); + } + this.byId.set(id, { ...this.byId.get(id)!, ...(run as Partial), id }); + }; + + updateRun = async (id: string, update: Record): Promise => { + const existing = this.byId.get(id); + if (existing) { + this.byId.set(id, { ...existing, ...(update as Partial), id }); + } + }; + + awaitPendingTraceBatches = async (): Promise => {}; + + byName(name: string): CollectedRun | undefined { + for (const id of this.createOrder) { + const run = this.byId.get(id)!; + if (run.name === name) { + return run; + } + } + return undefined; + } + + parentNameOf(name: string): string | undefined { + const run = this.byName(name); + if (!run?.parent_run_id) { + return undefined; + } + return this.byId.get(run.parent_run_id)?.name; + } + + asClient(): LangSmithClient { + return this as unknown as LangSmithClient; + } +} + +describe('langsmith/agent-pipeline', function () { + this.timeout(30_000); + + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createLocal(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + it('threads the trace through workflow, activities, and a child workflow', async () => { + const collector = new InMemoryRunCollector(); + const plugin = new LangSmithPlugin({ client: collector.asClient(), addTemporalRuns: true }); + const taskQueue = 'test-langsmith-agent-pipeline'; + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue, + workflowsPath: require.resolve('../workflows'), + activities, + plugins: [plugin], + }); + + const client = new Client({ + connection: testEnv.connection, + namespace: testEnv.namespace, + plugins: [plugin], + }); + + const pipeline = traceable( + async () => + client.workflow.execute(ResearchWorkflow, { + args: ['durable execution'], + workflowId: 'test-langsmith-agent-pipeline-' + Date.now(), + taskQueue, + }), + { name: 'research_pipeline' } + ); + + const result = await worker.runUntil(pipeline()); + + assert.strictEqual(result, 'reviewed: report based on: facts about durable execution'); + + assert.strictEqual(collector.parentNameOf('RunWorkflow:ResearchWorkflow'), 'research_pipeline'); + assert.strictEqual(collector.parentNameOf('RunActivity:gatherFacts'), 'RunWorkflow:ResearchWorkflow'); + assert.strictEqual(collector.parentNameOf('gather_llm_call'), 'RunActivity:gatherFacts'); + assert.strictEqual(collector.parentNameOf('write_llm_call'), 'RunActivity:writeReport'); + assert.strictEqual(collector.parentNameOf('RunWorkflow:ReviewWorkflow'), 'RunWorkflow:ResearchWorkflow'); + assert.strictEqual(collector.parentNameOf('review_llm_call'), 'RunActivity:reviewReport'); + assert.strictEqual(collector.parentNameOf('RunActivity:reviewReport'), 'RunWorkflow:ReviewWorkflow'); + }); +}); diff --git a/langsmith/agent-pipeline/src/worker.ts b/langsmith/agent-pipeline/src/worker.ts new file mode 100644 index 00000000..665642c5 --- /dev/null +++ b/langsmith/agent-pipeline/src/worker.ts @@ -0,0 +1,28 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import * as activities from './activities'; + +async function run() { + const connection = await NativeConnection.connect({ address: 'localhost:7233' }); + try { + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + + const worker = await Worker.create({ + connection, + taskQueue: 'langsmith-agent-pipeline', + workflowsPath: require.resolve('./workflows'), + activities, + plugins: [plugin], + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/agent-pipeline/src/workflows.ts b/langsmith/agent-pipeline/src/workflows.ts new file mode 100644 index 00000000..3d7d6f2c --- /dev/null +++ b/langsmith/agent-pipeline/src/workflows.ts @@ -0,0 +1,19 @@ +import { executeChild, proxyActivities, workflowInfo } from '@temporalio/workflow'; +import type * as activities from './activities'; + +const { gatherFacts, writeReport, reviewReport } = proxyActivities({ + startToCloseTimeout: '1 minute', +}); + +export async function ReviewWorkflow(report: string): Promise { + return reviewReport(report); +} + +export async function ResearchWorkflow(topic: string): Promise { + const facts = await gatherFacts(topic); + const report = await writeReport(facts); + return executeChild(ReviewWorkflow, { + args: [report], + workflowId: `${workflowInfo().workflowId}-review`, + }); +} diff --git a/langsmith/message-handlers/README.md b/langsmith/message-handlers/README.md new file mode 100644 index 00000000..618a9002 --- /dev/null +++ b/langsmith/message-handlers/README.md @@ -0,0 +1,32 @@ +# Message Handlers + +`traceable` calls run inside a Signal handler and an Update handler. Handler-body `traceable` runs use Workflow-body semantics and nest under the handler's own run (`HandleSignal:` / `HandleUpdate:`). Temporal-internal queries (`__temporal*`, `__stack_trace`) are never traced. The Workflow stays alive with `condition` until a completing Signal arrives. `addTemporalRuns: true` makes the handler runs visible. + +## Run + +Run these from the `langsmith/` root (run `npm install` there once first). To see live traces, `export LANGSMITH_TRACING=true` and `export LANGSMITH_API_KEY=...` in each terminal. + +```bash +# In one terminal, start the Worker (requires a local Temporal server): +npx ts-node message-handlers/src/worker.ts + +# In another terminal, run the scenario: +npx ts-node message-handlers/src/client.ts +``` + +## Test + +```bash +npx mocha --exit --require ts-node/register --require source-map-support/register "message-handlers/src/mocha/*.test.ts" +``` + +The test runs a real Worker against `TestWorkflowEnvironment` with an in-memory LangSmith Client, so no API key is required. + +## Expected trace + +``` +HandleSignal:handle_message + classify_intent +HandleUpdate:compose_reply + draft_reply +``` diff --git a/langsmith/message-handlers/src/client.ts b/langsmith/message-handlers/src/client.ts new file mode 100644 index 00000000..22328ec3 --- /dev/null +++ b/langsmith/message-handlers/src/client.ts @@ -0,0 +1,29 @@ +import { Connection, Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { nanoid } from 'nanoid'; +import { ConversationWorkflow, complete, composeReply, handleMessage } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + const client = new Client({ connection, plugins: [plugin] }); + + const handle = await client.workflow.start(ConversationWorkflow, { + taskQueue: 'langsmith-message-handlers', + workflowId: 'langsmith-message-handlers-' + nanoid(), + }); + + await handle.signal(handleMessage, 'when is my order arriving?'); + const reply = await handle.executeUpdate(composeReply, { args: ['please send my tracking number'] }); + console.log(reply); + + await handle.signal(complete); + console.log(await handle.result()); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/message-handlers/src/mocha/workflows.test.ts b/langsmith/message-handlers/src/mocha/workflows.test.ts new file mode 100644 index 00000000..818c52bd --- /dev/null +++ b/langsmith/message-handlers/src/mocha/workflows.test.ts @@ -0,0 +1,112 @@ +// Tracing is off by default; enable it before the plugin is constructed. +process.env.LANGSMITH_TRACING = 'true'; + +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { Worker } from '@temporalio/worker'; +import { Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { after, before, describe, it } from 'mocha'; +import assert from 'assert'; +import { ConversationWorkflow, complete, composeReply, handleMessage } from '../workflows'; + +interface CollectedRun { + id: string; + name: string; + parent_run_id?: string; +} + +class InMemoryRunCollector { + readonly createOrder: string[] = []; + readonly byId = new Map(); + + createRun = async (run: Record): Promise => { + const id = String(run.id); + if (!this.byId.has(id)) { + this.createOrder.push(id); + this.byId.set(id, { id, name: String(run.name) }); + } + this.byId.set(id, { ...this.byId.get(id)!, ...(run as Partial), id }); + }; + + updateRun = async (id: string, update: Record): Promise => { + const existing = this.byId.get(id); + if (existing) { + this.byId.set(id, { ...existing, ...(update as Partial), id }); + } + }; + + awaitPendingTraceBatches = async (): Promise => {}; + + byName(name: string): CollectedRun | undefined { + for (const id of this.createOrder) { + const run = this.byId.get(id)!; + if (run.name === name) { + return run; + } + } + return undefined; + } + + parentNameOf(name: string): string | undefined { + const run = this.byName(name); + if (!run?.parent_run_id) { + return undefined; + } + return this.byId.get(run.parent_run_id)?.name; + } + + asClient(): LangSmithClient { + return this as unknown as LangSmithClient; + } +} + +describe('langsmith/message-handlers', function () { + this.timeout(30_000); + + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createLocal(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + it('nests handler-body traceables under the signal and update handler runs', async () => { + const collector = new InMemoryRunCollector(); + const plugin = new LangSmithPlugin({ client: collector.asClient(), addTemporalRuns: true }); + const taskQueue = 'test-langsmith-message-handlers'; + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue, + workflowsPath: require.resolve('../workflows'), + plugins: [plugin], + }); + + const client = new Client({ + connection: testEnv.connection, + namespace: testEnv.namespace, + plugins: [plugin], + }); + + const log = await worker.runUntil(async () => { + const handle = await client.workflow.start(ConversationWorkflow, { + workflowId: 'test-langsmith-message-handlers-' + Date.now(), + taskQueue, + }); + await handle.signal(handleMessage, 'when is my order arriving?'); + await handle.executeUpdate(composeReply, { args: ['please send my tracking number'] }); + await handle.signal(complete); + return handle.result(); + }); + + assert.deepStrictEqual(log, ['intent:when is my order arriving?', 'reply:please send my tracking number']); + + assert.strictEqual(collector.parentNameOf('classify_intent'), 'HandleSignal:handle_message'); + assert.strictEqual(collector.parentNameOf('draft_reply'), 'HandleUpdate:compose_reply'); + assert.strictEqual(collector.byName('HandleQuery:__stack_trace'), undefined); + }); +}); diff --git a/langsmith/message-handlers/src/worker.ts b/langsmith/message-handlers/src/worker.ts new file mode 100644 index 00000000..3fd21554 --- /dev/null +++ b/langsmith/message-handlers/src/worker.ts @@ -0,0 +1,26 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; + +async function run() { + const connection = await NativeConnection.connect({ address: 'localhost:7233' }); + try { + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith, addTemporalRuns: true }); + + const worker = await Worker.create({ + connection, + taskQueue: 'langsmith-message-handlers', + workflowsPath: require.resolve('./workflows'), + plugins: [plugin], + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/message-handlers/src/workflows.ts b/langsmith/message-handlers/src/workflows.ts new file mode 100644 index 00000000..6f68811c --- /dev/null +++ b/langsmith/message-handlers/src/workflows.ts @@ -0,0 +1,36 @@ +import { traceable } from 'langsmith/traceable'; +import { allHandlersFinished, condition, defineSignal, defineUpdate, setHandler } from '@temporalio/workflow'; + +const classifyMessage = traceable(async (text: string): Promise => `intent:${text}`, { + name: 'classify_intent', +}); + +const draftReply = traceable(async (text: string): Promise => `reply:${text}`, { + name: 'draft_reply', +}); + +export const handleMessage = defineSignal<[string]>('handle_message'); +export const composeReply = defineUpdate('compose_reply'); +export const complete = defineSignal('complete'); + +export async function ConversationWorkflow(): Promise { + const log: string[] = []; + let done = false; + + setHandler(handleMessage, async (text: string) => { + log.push(await classifyMessage(text)); + }); + + setHandler(composeReply, async (text: string) => { + const reply = await draftReply(text); + log.push(reply); + return reply; + }); + + setHandler(complete, () => { + done = true; + }); + + await condition(() => done && allHandlersFinished()); + return log; +} diff --git a/langsmith/package.json b/langsmith/package.json new file mode 100644 index 00000000..f9a64fed --- /dev/null +++ b/langsmith/package.json @@ -0,0 +1,38 @@ +{ + "name": "temporal-langsmith", + "version": "0.1.0", + "private": true, + "scripts": { + "build": "tsc --build", + "build.watch": "tsc --build --watch", + "format": "prettier --write .", + "format:check": "prettier --check .", + "lint": "eslint .", + "test": "mocha --exit --require ts-node/register --require source-map-support/register \"*/src/mocha/*.test.ts\"" + }, + "dependencies": { + "@temporalio/activity": "^1.18.0", + "@temporalio/client": "^1.18.0", + "@temporalio/langsmith": "^1.18.0", + "@temporalio/worker": "^1.18.0", + "@temporalio/workflow": "^1.18.0", + "langsmith": "^0.7.9", + "nanoid": "3.x" + }, + "devDependencies": { + "@temporalio/testing": "^1.18.0", + "@tsconfig/node22": "^22.0.0", + "@types/mocha": "8.x", + "@types/node": "^22.9.1", + "@typescript-eslint/eslint-plugin": "^8.18.0", + "@typescript-eslint/parser": "^8.18.0", + "eslint": "^8.57.1", + "eslint-config-prettier": "^9.1.0", + "eslint-plugin-deprecation": "^3.0.0", + "mocha": "8.x", + "prettier": "^3.4.2", + "ts-node": "^10.9.2", + "typescript": "^5.6.3", + "source-map-support": "^0.5.21" + } +} diff --git a/langsmith/tsconfig.json b/langsmith/tsconfig.json new file mode 100644 index 00000000..da8a0e08 --- /dev/null +++ b/langsmith/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "@tsconfig/node22/tsconfig.json", + "compilerOptions": { + "lib": ["es2021"], + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "rootDir": ".", + "outDir": "./lib" + }, + "include": ["*/src/**/*.ts"] +} diff --git a/langsmith/workflow-tracing/README.md b/langsmith/workflow-tracing/README.md new file mode 100644 index 00000000..c0d01bf6 --- /dev/null +++ b/langsmith/workflow-tracing/README.md @@ -0,0 +1,33 @@ +# Workflow Tracing + +Two `traceable` calls run directly in a Workflow body. A naive tracer re-emits every run on every history replay and floods the project with duplicates; this plugin makes Workflow-body `traceable` calls **replay-safe**: the runs get deterministic IDs, are emitted out-of-isolate via a Temporal Sink, and are suppressed during replay. The Client starts the Workflow from inside its own `traceable` (`user_pipeline`) so the two Workflow-body runs nest under it. + +Sequential `await`ed `traceable` calls in a Workflow body parent under the propagated run exactly; see the integration README for the concurrency caveat around `Promise.all` fan-out. + +## Run + +Run these from the `langsmith/` root (run `npm install` there once first). To see live traces, `export LANGSMITH_TRACING=true` and `export LANGSMITH_API_KEY=...` in each terminal. + +```bash +# In one terminal, start the Worker (requires a local Temporal server): +npx ts-node workflow-tracing/src/worker.ts + +# In another terminal, run the scenario: +npx ts-node workflow-tracing/src/client.ts +``` + +## Test + +```bash +npx mocha --exit --require ts-node/register --require source-map-support/register "workflow-tracing/src/mocha/*.test.ts" +``` + +The test forces replay on every Workflow Task (`maxCachedWorkflows: 0`) and asserts each Workflow-body run is emitted exactly once, so it exercises the replay-safety guarantee directly. It uses an in-memory LangSmith Client, so no API key is required. + +## Expected trace + +``` +user_pipeline + extract_key_points + summarize +``` diff --git a/langsmith/workflow-tracing/src/client.ts b/langsmith/workflow-tracing/src/client.ts new file mode 100644 index 00000000..12b62d07 --- /dev/null +++ b/langsmith/workflow-tracing/src/client.ts @@ -0,0 +1,32 @@ +import { Connection, Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { traceable } from 'langsmith/traceable'; +import { nanoid } from 'nanoid'; +import { SummarizeWorkflow } from './workflows'; + +async function run() { + const connection = await Connection.connect(); + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith }); + const client = new Client({ connection, plugins: [plugin] }); + + const pipeline = traceable( + async () => { + return client.workflow.execute(SummarizeWorkflow, { + taskQueue: 'langsmith-workflow-tracing', + workflowId: 'langsmith-workflow-tracing-' + nanoid(), + args: ['the meeting notes'], + }); + }, + { name: 'user_pipeline' } + ); + + const result = await pipeline(); + console.log(result); +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/workflow-tracing/src/mocha/workflows.test.ts b/langsmith/workflow-tracing/src/mocha/workflows.test.ts new file mode 100644 index 00000000..f9aedc98 --- /dev/null +++ b/langsmith/workflow-tracing/src/mocha/workflows.test.ts @@ -0,0 +1,127 @@ +// Tracing is off by default; enable it before the plugin is constructed. +process.env.LANGSMITH_TRACING = 'true'; + +import { TestWorkflowEnvironment } from '@temporalio/testing'; +import { Worker } from '@temporalio/worker'; +import { Client } from '@temporalio/client'; +import { Client as LangSmithClient } from 'langsmith'; +import { traceable } from 'langsmith/traceable'; +import { LangSmithPlugin } from '@temporalio/langsmith'; +import { after, before, describe, it } from 'mocha'; +import assert from 'assert'; +import { SummarizeWorkflow } from '../workflows'; + +interface CollectedRun { + id: string; + name: string; + parent_run_id?: string; +} + +class InMemoryRunCollector { + readonly createOrder: string[] = []; + readonly byId = new Map(); + + createRun = async (run: Record): Promise => { + const id = String(run.id); + if (!this.byId.has(id)) { + this.createOrder.push(id); + this.byId.set(id, { id, name: String(run.name) }); + } + this.byId.set(id, { ...this.byId.get(id)!, ...(run as Partial), id }); + }; + + updateRun = async (id: string, update: Record): Promise => { + const existing = this.byId.get(id); + if (existing) { + this.byId.set(id, { ...existing, ...(update as Partial), id }); + } + }; + + awaitPendingTraceBatches = async (): Promise => {}; + + countByName(name: string): number { + let n = 0; + for (const id of this.createOrder) { + if (this.byId.get(id)!.name === name) { + n += 1; + } + } + return n; + } + + byName(name: string): CollectedRun | undefined { + for (const id of this.createOrder) { + const run = this.byId.get(id)!; + if (run.name === name) { + return run; + } + } + return undefined; + } + + parentNameOf(name: string): string | undefined { + const run = this.byName(name); + if (!run?.parent_run_id) { + return undefined; + } + return this.byId.get(run.parent_run_id)?.name; + } + + asClient(): LangSmithClient { + return this as unknown as LangSmithClient; + } +} + +describe('langsmith/workflow-tracing', function () { + this.timeout(30_000); + + let testEnv: TestWorkflowEnvironment; + + before(async () => { + testEnv = await TestWorkflowEnvironment.createLocal(); + }); + + after(async () => { + await testEnv?.teardown(); + }); + + it('emits each workflow-body traceable exactly once under replay, nesting sequentially', async () => { + const collector = new InMemoryRunCollector(); + const plugin = new LangSmithPlugin({ client: collector.asClient() }); + const taskQueue = 'test-langsmith-workflow-tracing'; + + const worker = await Worker.create({ + connection: testEnv.nativeConnection, + taskQueue, + workflowsPath: require.resolve('../workflows'), + plugins: [plugin], + // Force replay on every Workflow Task so the replay-safety guarantee is exercised. + maxCachedWorkflows: 0, + }); + + const client = new Client({ + connection: testEnv.connection, + namespace: testEnv.namespace, + plugins: [plugin], + }); + + const pipeline = traceable( + async () => + client.workflow.execute(SummarizeWorkflow, { + args: ['the meeting notes'], + workflowId: 'test-langsmith-workflow-tracing-' + Date.now(), + taskQueue, + }), + { name: 'user_pipeline' } + ); + + const result = await worker.runUntil(pipeline()); + + assert.strictEqual(result, 'summary:points:the meeting notes'); + + assert.strictEqual(collector.countByName('extract_key_points'), 1); + assert.strictEqual(collector.countByName('summarize'), 1); + assert.strictEqual(collector.parentNameOf('extract_key_points'), 'user_pipeline'); + assert.strictEqual(collector.parentNameOf('summarize'), 'user_pipeline'); + }); +}); diff --git a/langsmith/workflow-tracing/src/worker.ts b/langsmith/workflow-tracing/src/worker.ts new file mode 100644 index 00000000..914f53ab --- /dev/null +++ b/langsmith/workflow-tracing/src/worker.ts @@ -0,0 +1,26 @@ +import { NativeConnection, Worker } from '@temporalio/worker'; +import { Client as LangSmithClient } from 'langsmith'; +import { LangSmithPlugin } from '@temporalio/langsmith'; + +async function run() { + const connection = await NativeConnection.connect({ address: 'localhost:7233' }); + try { + const langsmith = new LangSmithClient(); + const plugin = new LangSmithPlugin({ client: langsmith }); + + const worker = await Worker.create({ + connection, + taskQueue: 'langsmith-workflow-tracing', + workflowsPath: require.resolve('./workflows'), + plugins: [plugin], + }); + await worker.run(); + } finally { + await connection.close(); + } +} + +run().catch((err) => { + console.error(err); + process.exit(1); +}); diff --git a/langsmith/workflow-tracing/src/workflows.ts b/langsmith/workflow-tracing/src/workflows.ts new file mode 100644 index 00000000..1e5b86af --- /dev/null +++ b/langsmith/workflow-tracing/src/workflows.ts @@ -0,0 +1,14 @@ +import { traceable } from 'langsmith/traceable'; + +const extractKeyPoints = traceable(async (text: string): Promise => `points:${text}`, { + name: 'extract_key_points', +}); + +const summarize = traceable(async (points: string): Promise => `summary:${points}`, { + name: 'summarize', +}); + +export async function SummarizeWorkflow(text: string): Promise { + const points = await extractKeyPoints(text); + return summarize(points); +}