From 48aae82bbc19ba8b0741e61a08063eb0d1df464e Mon Sep 17 00:00:00 2001 From: Agustin Rivera <31522568+eleqtrizit@users.noreply.github.com> Date: Mon, 13 Apr 2026 12:30:42 -0700 Subject: [PATCH] fix(outbound): replay queued session context (#66025) * fix(outbound): preserve replay session context * fix(outbound): remove user work log * changelog: note outbound session-context replay fix (#66025) --------- Co-authored-by: Devin Robison --- CHANGELOG.md | 1 + src/infra/outbound/deliver.ts | 1 + src/infra/outbound/delivery-queue-recovery.ts | 1 + src/infra/outbound/delivery-queue-storage.ts | 4 ++ .../outbound/delivery-queue.recovery.test.ts | 18 ++++++++ .../outbound/delivery-queue.storage.test.ts | 43 +++++++++++++++++++ 6 files changed, 68 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd31ba2fb7b..493b0062bc0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -26,6 +26,7 @@ Docs: https://docs.openclaw.ai - Browser/CDP: let local attach-only `manual-cdp` profiles reuse the local loopback CDP control plane under strict default policy and remote-class probe timeouts, so tabs/snapshot stop falsely reporting a live local browser session as not running. (#65611, #66080) Thanks @mbelinky. - Cron/scheduler: stop inventing short retries when cron next-run calculation returns no valid future slot, and keep a maintenance wake armed so enabled unscheduled jobs recover without entering a refire loop. (#66019, #66083) Thanks @mbelinky. - Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky. +- Outbound/delivery-queue: persist the originating outbound `session` context on queued delivery entries and replay it during recovery, so write-ahead-queued sends keep their original outbound media policy context after restart instead of evaluating against a missing session. (#66025) Thanks @eleqtrizit. ## 2026.4.12 diff --git a/src/infra/outbound/deliver.ts b/src/infra/outbound/deliver.ts index 9cc15881639..1855e738d20 100644 --- a/src/infra/outbound/deliver.ts +++ b/src/infra/outbound/deliver.ts @@ -507,6 +507,7 @@ export async function deliverOutboundPayloads( forceDocument: params.forceDocument, silent: params.silent, mirror: params.mirror, + session: params.session, gatewayClientScopes: params.gatewayClientScopes, }).catch(() => null); // Best-effort — don't block delivery if queue write fails. diff --git a/src/infra/outbound/delivery-queue-recovery.ts b/src/infra/outbound/delivery-queue-recovery.ts index 61a1fca3e11..2b49783eb5e 100644 --- a/src/infra/outbound/delivery-queue-recovery.ts +++ b/src/infra/outbound/delivery-queue-recovery.ts @@ -104,6 +104,7 @@ function buildRecoveryDeliverParams(entry: QueuedDelivery, cfg: OpenClawConfig) forceDocument: entry.forceDocument, silent: entry.silent, mirror: entry.mirror, + session: entry.session, gatewayClientScopes: entry.gatewayClientScopes, skipQueue: true, // Prevent re-enqueueing during recovery. } satisfies Parameters[0]; diff --git a/src/infra/outbound/delivery-queue-storage.ts b/src/infra/outbound/delivery-queue-storage.ts index 59a5ed0b890..83d23b173be 100644 --- a/src/infra/outbound/delivery-queue-storage.ts +++ b/src/infra/outbound/delivery-queue-storage.ts @@ -4,6 +4,7 @@ import type { ReplyPayload } from "../../auto-reply/types.js"; import { resolveStateDir } from "../../config/paths.js"; import { generateSecureUuid } from "../secure-random.js"; import type { OutboundMirror } from "./mirror.js"; +import type { OutboundSessionContext } from "./session-context.js"; import type { OutboundChannel } from "./targets.js"; const QUEUE_DIRNAME = "delivery-queue"; @@ -26,6 +27,8 @@ export type QueuedDeliveryPayload = { forceDocument?: boolean; silent?: boolean; mirror?: OutboundMirror; + /** Session context needed to preserve outbound media policy on recovery. */ + session?: OutboundSessionContext; /** Gateway caller scopes at enqueue time, preserved for recovery replay. */ gatewayClientScopes?: readonly string[]; }; @@ -144,6 +147,7 @@ export async function enqueueDelivery( forceDocument: params.forceDocument, silent: params.silent, mirror: params.mirror, + session: params.session, gatewayClientScopes: params.gatewayClientScopes, retryCount: 0, }); diff --git a/src/infra/outbound/delivery-queue.recovery.test.ts b/src/infra/outbound/delivery-queue.recovery.test.ts index a8dbc8b7617..592935cb80c 100644 --- a/src/infra/outbound/delivery-queue.recovery.test.ts +++ b/src/infra/outbound/delivery-queue.recovery.test.ts @@ -165,6 +165,15 @@ describe("delivery-queue recovery", () => { text: "a", mediaUrls: ["https://example.com/a.png"], }, + session: { + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + requesterSenderName: "Sender One", + requesterSenderUsername: "sender.one", + requesterSenderE164: "+15551234567", + }, }, tmpDir(), ); @@ -183,6 +192,15 @@ describe("delivery-queue recovery", () => { text: "a", mediaUrls: ["https://example.com/a.png"], }, + session: { + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + requesterSenderName: "Sender One", + requesterSenderUsername: "sender.one", + requesterSenderE164: "+15551234567", + }, }), ); }); diff --git a/src/infra/outbound/delivery-queue.storage.test.ts b/src/infra/outbound/delivery-queue.storage.test.ts index 676c3d49d19..feed95131cd 100644 --- a/src/infra/outbound/delivery-queue.storage.test.ts +++ b/src/infra/outbound/delivery-queue.storage.test.ts @@ -33,6 +33,12 @@ describe("delivery-queue storage", () => { text: "hello", mediaUrls: ["https://example.com/file.png"], }, + session: { + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + }, }, tmpDir(), ); @@ -53,6 +59,12 @@ describe("delivery-queue storage", () => { text: "hello", mediaUrls: ["https://example.com/file.png"], }, + session: { + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + }, retryCount: 0, }); expect(entry.payloads).toEqual([{ text: "hello" }]); @@ -169,6 +181,37 @@ describe("delivery-queue storage", () => { expect(entry.gatewayClientScopes).toEqual(["operator.write"]); }); + it("persists session context for recovery replay", async () => { + const id = await enqueueTextDelivery( + { + channel: "telegram", + to: "2", + payloads: [{ text: "b" }], + session: { + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + requesterSenderName: "Sender One", + requesterSenderUsername: "sender.one", + requesterSenderE164: "+15551234567", + }, + }, + tmpDir(), + ); + + const entry = readQueuedEntry(tmpDir(), id); + expect(entry.session).toEqual({ + key: "agent:main:main", + agentId: "agent-main", + requesterAccountId: "acct-1", + requesterSenderId: "sender-1", + requesterSenderName: "Sender One", + requesterSenderUsername: "sender.one", + requesterSenderE164: "+15551234567", + }); + }); + it("backfills lastAttemptAt for legacy retry entries during load", async () => { const id = await enqueueTextDelivery({ channel: "whatsapp",