Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions services/kiloclaw-inbound-email/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# kiloclaw-inbound-email

Cloudflare Email Routing handler for `kiloclaw.ai`. Receives mail addressed to `<alias>@kiloclaw.ai`, looks up the alias in `kiloclaw_inbound_email_aliases`, parses the message, and enqueues a delivery for the consumer to forward to the platform worker's `/api/platform/inbound-email` endpoint.

## Pipeline

```
Cloudflare Email Routing
→ email() handler (this worker)
→ resolveRecipient → lookupInstanceIdByAlias
→ parseRawEmail
→ INBOUND_EMAIL_QUEUE.send({ instanceId, alias, from, subject, text, ... })
→ queue consumer (this worker)
→ POST kiloclaw worker /api/platform/inbound-email
→ POST instance controller /_kilo/hooks/email
→ OpenClaw /hooks/email
```

## Observability

To make logs reach Axiom, the worker needs **both** flags in `wrangler.jsonc`:

```jsonc
"observability": { "enabled": true },
"logpush": true,
```

The account-level Logpush job is set to "all logs", but Cloudflare still requires each worker to opt in via `logpush: true`. `observability.enabled` alone isn't enough: without `logpush: true`, the worker's trace events stay inside Cloudflare and never reach the `cloudflare-logpush` Axiom dataset. Check `ScriptName == "<your-worker>"` in Axiom after deploy to confirm.
1 change: 1 addition & 0 deletions services/kiloclaw-inbound-email/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"compatibility_flags": ["nodejs_compat"],
"dev": { "port": 8810 },
"observability": { "enabled": true },
"logpush": true,
"placement": { "mode": "smart" },
"hyperdrive": [
{
Expand Down
51 changes: 51 additions & 0 deletions services/kiloclaw/controller/src/bootstrap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,57 @@ describe('runOnboardOrDoctor', () => {
expect(preDoctorConfig.plugins?.entries).not.toHaveProperty('openclaw-channel-streamchat');
});

it('back-fills hooks.allowRequestSessionKey on existing configs before doctor', () => {
const harness = fakeDeps();
harness.setConfigExists(true);
(harness.deps.readFileSync as ReturnType<typeof vi.fn>).mockReturnValue(
JSON.stringify({
hooks: {
enabled: true,
token: 'existing-token',
path: '/hooks',
mappings: [
{
id: 'cloudflare-email-inbound',
match: { path: 'email' },
action: 'agent',
wakeMode: 'now',
sessionKey: '{{payload.sessionKey}}',
messageTemplate: 'From: {{payload.from}}',
deliver: false,
},
],
},
})
);

runOnboardOrDoctor(
{
KILOCODE_API_KEY: 'test-key',
OPENCLAW_GATEWAY_TOKEN: 'test-token',
AUTO_APPROVE_DEVICES: 'true',
},
harness.deps
);

const doctorCallIndex = (
harness.deps.execFileSync as ReturnType<typeof vi.fn>
).mock.calls.findIndex(([_cmd, args]) => Array.isArray(args) && args.includes('doctor'));
expect(doctorCallIndex).not.toBe(-1);
const doctorCallOrder = (harness.deps.execFileSync as ReturnType<typeof vi.fn>).mock
.invocationCallOrder[doctorCallIndex];

const preDoctorWriteIndex = (
harness.deps.writeFileSync as ReturnType<typeof vi.fn>
).mock.invocationCallOrder.findIndex(order => order < doctorCallOrder);
expect(preDoctorWriteIndex).not.toBe(-1);

const preDoctorConfig = JSON.parse(harness.writeCalls[preDoctorWriteIndex].data) as {
hooks?: { allowRequestSessionKey?: boolean };
};
expect(preDoctorConfig.hooks?.allowRequestSessionKey).toBe(true);
});

it('migrates legacy plaintext kilocode key in auth-profiles.json to a keyRef', () => {
// Integration check: runOnboardOrDoctor must drive the auth-profiles
// migration. On a legacy doctor boot, a plaintext key in
Expand Down
19 changes: 14 additions & 5 deletions services/kiloclaw/controller/src/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import path from 'node:path';
import { execFileSync as nodeExecFileSync } from 'node:child_process';
import {
generateBaseConfig,
ensureInboundEmailHookFlags,
sanitizeLegacyStreamChatConfig,
writeBaseConfig,
writeMcporterConfig,
Expand Down Expand Up @@ -727,16 +728,24 @@ function sanitizeExistingConfigBeforeDoctor(deps: BootstrapDeps): void {
return;
}

const before = JSON.stringify(parsed);
const initial = JSON.stringify(parsed);
const applied: string[] = [];

sanitizeLegacyStreamChatConfig(parsed);
const serialized = JSON.stringify(parsed, null, 2);
if (JSON.stringify(parsed) === before) {
let snapshot = JSON.stringify(parsed);
if (snapshot !== initial) applied.push('streamChat');

ensureInboundEmailHookFlags(parsed);
const final = JSON.stringify(parsed);
if (final !== snapshot) applied.push('inboundEmailFlags');

if (applied.length === 0) {
return;
}

atomicWrite(
CONFIG_PATH,
serialized,
JSON.stringify(parsed, null, 2),
{
writeFileSync: deps.writeFileSync,
renameSync: deps.renameSync,
Expand All @@ -745,7 +754,7 @@ function sanitizeExistingConfigBeforeDoctor(deps: BootstrapDeps): void {
},
{ mode: 0o600 }
);
console.log('Removed legacy Stream Chat config before doctor');
console.log(`Sanitized existing config before doctor: [${applied.join(', ')}]`);
}

export function runOnboardOrDoctor(env: EnvLike, deps: BootstrapDeps = defaultDeps): void {
Expand Down
35 changes: 35 additions & 0 deletions services/kiloclaw/controller/src/config-writer.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { describe, it, expect, vi } from 'vitest';
import {
backupConfigFile,
ensureInboundEmailHookFlags,
generateBaseConfig,
setNestedValue,
writeBaseConfig,
Expand Down Expand Up @@ -974,6 +975,7 @@ describe('generateBaseConfig', () => {
expect(config.hooks.enabled).toBe(true);
expect(config.hooks.token).toBe('test-hooks-token');
expect(config.hooks.path).toBe('/hooks');
expect(config.hooks.allowRequestSessionKey).toBe(true);
expect(config.hooks.allowedSessionKeyPrefixes).toEqual(['hook:', 'inbound-email:']);
expect(config.hooks.presets).toBeUndefined();
expect(config.hooks.mappings).toContainEqual({
Expand Down Expand Up @@ -1289,6 +1291,39 @@ describe('generateBaseConfig', () => {
});
});

describe('ensureInboundEmailHookFlags', () => {
it('sets allowRequestSessionKey on a hooks object that lacks it', () => {
const config = { hooks: { enabled: true, token: 'tok' } };
ensureInboundEmailHookFlags(config);
expect(config.hooks).toMatchObject({
enabled: true,
token: 'tok',
allowRequestSessionKey: true,
});
});

it('is a no-op when hooks block is absent (instance has no inbound hooks)', () => {
const config: Record<string, unknown> = { gateway: {} };
ensureInboundEmailHookFlags(config);
expect(config.hooks).toBeUndefined();
});

it('is idempotent when allowRequestSessionKey is already true', () => {
const config = { hooks: { allowRequestSessionKey: true } };
ensureInboundEmailHookFlags(config);
expect(config.hooks.allowRequestSessionKey).toBe(true);
});

it('overrides allowRequestSessionKey: false back to true', () => {
// Canonical-config policy: the inbound-email mapping is force-installed
// on every run, so the flag it requires must converge to true alongside
// it. An explicit `false` is treated as drift, not as admin intent.
const config = { hooks: { allowRequestSessionKey: false } };
ensureInboundEmailHookFlags(config);
expect(config.hooks.allowRequestSessionKey).toBe(true);
});
});

describe('setNestedValue', () => {
it('sets a value at a simple path', () => {
const obj: Record<string, unknown> = {};
Expand Down
19 changes: 19 additions & 0 deletions services/kiloclaw/controller/src/config-writer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,24 @@ type ConfigObject = Record<string, any>;

type EnvLike = Record<string, string | undefined>;

// OpenClaw refuses hook mappings that derive `sessionKey` from a request
// payload unless this flag is set. The inbound-email mapping uses
// `{{payload.sessionKey}}` so the platform worker can pre-compute a stable
// key like `inbound-email:YYYY-MM-DD-<slug>` that coalesces emails on the
// same thread into one agent session.
//
// Force-overrides any prior value (including `false`) on purpose: the
// inbound-email mapping is canonical config — generateBaseConfig
// unconditionally installs/overwrites it on every run — so the flag it
// requires must converge to true alongside the mapping. If an admin needs
// to disable inbound email handling, the right lever is the
// kiloclaw_instances.inbound_email_enabled column, not flipping this flag.
export function ensureInboundEmailHookFlags(config: ConfigObject): void {
if (config.hooks && typeof config.hooks === 'object' && !Array.isArray(config.hooks)) {
config.hooks.allowRequestSessionKey = true;
}
}

export function sanitizeLegacyStreamChatConfig(config: ConfigObject): void {
if (config.channels && typeof config.channels === 'object' && !Array.isArray(config.channels)) {
delete config.channels.streamchat;
Expand Down Expand Up @@ -551,6 +569,7 @@ export function generateBaseConfig(
config.hooks.enabled = true;
config.hooks.token = env.KILOCLAW_HOOKS_TOKEN;
config.hooks.path = '/hooks';
ensureInboundEmailHookFlags(config);
config.hooks.allowedSessionKeyPrefixes = Array.isArray(config.hooks.allowedSessionKeyPrefixes)
? config.hooks.allowedSessionKeyPrefixes
: [];
Expand Down
13 changes: 12 additions & 1 deletion services/kiloclaw/src/routes/platform.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3034,12 +3034,23 @@ platform.post('/inbound-email', async c => {
}

const error = await response.text().catch(() => '');
let controllerErrorMessage: string | undefined;
try {
const parsed: unknown = JSON.parse(error);
if (parsed && typeof parsed === 'object' && 'error' in parsed) {
const candidate = (parsed as { error: unknown }).error;
if (typeof candidate === 'string') controllerErrorMessage = candidate;
}
} catch {
// body wasn't JSON; the raw `error` field below preserves it
}
const controllerFailure = {
...logContext,
userId: instance.userId,
doKey,
status: response.status,
error: error.slice(0, 500),
error: error.slice(0, 2000),
controllerErrorMessage,
durationMs: performance.now() - startedAt,
};
if (response.status >= 500) {
Expand Down