diff --git a/src/auto-reply/reply/dispatch-from-config.test.ts b/src/auto-reply/reply/dispatch-from-config.test.ts index c3c6a25b46f..4e48b95a541 100644 --- a/src/auto-reply/reply/dispatch-from-config.test.ts +++ b/src/auto-reply/reply/dispatch-from-config.test.ts @@ -2692,6 +2692,54 @@ describe("dispatchReplyFromConfig", () => { ); }); + it("releases inbound dedupe when dispatch fails before completion", async () => { + setNoAbort(); + const cfg = { diagnostics: { enabled: true } } as OpenClawConfig; + const ctx = buildTestCtx({ + Provider: "whatsapp", + OriginatingChannel: "whatsapp", + OriginatingTo: "whatsapp:+15555550124", + To: "whatsapp:+15555550124", + AccountId: "default", + MessageSid: "msg-dup-error", + SessionKey: "agent:main:whatsapp:direct:+15555550124", + CommandBody: "hello", + RawBody: "hello", + Body: "hello", + }); + const replyResolver = vi + .fn< + (_ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig) => Promise + >() + .mockRejectedValueOnce(new Error("dispatch failed")) + .mockResolvedValueOnce({ text: "retry succeeds" }); + + await expect( + dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }), + ).rejects.toThrow("dispatch failed"); + + await dispatchReplyFromConfig({ + ctx, + cfg, + dispatcher: createDispatcher(), + replyResolver, + }); + + expect(replyResolver).toHaveBeenCalledTimes(2); + expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "whatsapp", + outcome: "error", + error: "Error: dispatch failed", + }), + ); + }); + it("passes configOverride to replyResolver when provided", async () => { setNoAbort(); const cfg = emptyConfig; diff --git a/src/auto-reply/reply/dispatch-from-config.ts b/src/auto-reply/reply/dispatch-from-config.ts index 7370606f2c5..dcfcdab25f9 100644 --- a/src/auto-reply/reply/dispatch-from-config.ts +++ b/src/auto-reply/reply/dispatch-from-config.ts @@ -63,7 +63,7 @@ import type { DispatchFromConfigParams, DispatchFromConfigResult, } from "./dispatch-from-config.types.js"; -import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js"; +import { claimInboundDedupe, commitInboundDedupe, releaseInboundDedupe } from "./inbound-dedupe.js"; import { resolveReplyRoutingDecision } from "./routing-policy.js"; import { resolveRunTypingPolicy } from "./typing-policy.js"; @@ -255,7 +255,8 @@ export async function dispatchReplyFromConfig( }); }; - if (shouldSkipDuplicateInbound(ctx)) { + const inboundDedupeClaim = claimInboundDedupe(ctx); + if (inboundDedupeClaim.status === "duplicate" || inboundDedupeClaim.status === "inflight") { recordProcessed("skipped", { reason: "duplicate" }); return { queuedFinal: false, counts: dispatcher.getQueuedCounts() }; } @@ -1032,6 +1033,9 @@ export async function dispatchReplyFromConfig( const counts = dispatcher.getQueuedCounts(); counts.final += routedFinalCount; + if (inboundDedupeClaim.status === "claimed") { + commitInboundDedupe(inboundDedupeClaim.key); + } recordProcessed( "completed", pluginFallbackReason ? { reason: pluginFallbackReason } : undefined, @@ -1039,6 +1043,9 @@ export async function dispatchReplyFromConfig( markIdle("message_completed"); return { queuedFinal, counts }; } catch (err) { + if (inboundDedupeClaim.status === "claimed") { + releaseInboundDedupe(inboundDedupeClaim.key); + } recordProcessed("error", { error: String(err) }); markIdle("message_error"); throw err; diff --git a/src/auto-reply/reply/inbound-dedupe.test.ts b/src/auto-reply/reply/inbound-dedupe.test.ts index c71aeb598dd..f73a8a9edb6 100644 --- a/src/auto-reply/reply/inbound-dedupe.test.ts +++ b/src/auto-reply/reply/inbound-dedupe.test.ts @@ -40,4 +40,36 @@ describe("inbound dedupe", () => { inboundB.resetInboundDedupe(); } }); + + it("shares claim/release state across distinct module instances", async () => { + const inboundA = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=claim-a", + ); + const inboundB = await importFreshModule( + import.meta.url, + "./inbound-dedupe.js?scope=claim-b", + ); + + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + + try { + const firstClaim = inboundA.claimInboundDedupe(sharedInboundContext); + expect(firstClaim).toMatchObject({ status: "claimed" }); + expect(inboundB.claimInboundDedupe(sharedInboundContext)).toMatchObject({ + status: "inflight", + }); + if (firstClaim.status !== "claimed") { + throw new Error("expected claimed inbound dedupe result"); + } + inboundB.releaseInboundDedupe(firstClaim.key); + expect(inboundA.claimInboundDedupe(sharedInboundContext)).toMatchObject({ + status: "claimed", + }); + } finally { + inboundA.resetInboundDedupe(); + inboundB.resetInboundDedupe(); + } + }); }); diff --git a/src/auto-reply/reply/inbound-dedupe.ts b/src/auto-reply/reply/inbound-dedupe.ts index b85cf73d5d4..90fd48f25cf 100644 --- a/src/auto-reply/reply/inbound-dedupe.ts +++ b/src/auto-reply/reply/inbound-dedupe.ts @@ -1,6 +1,7 @@ import { logVerbose, shouldLogVerbose } from "../../globals.js"; import { resolveGlobalDedupeCache, type DedupeCache } from "../../infra/dedupe.js"; import { parseAgentSessionKey } from "../../sessions/session-key-utils.js"; +import { resolveGlobalSingleton } from "../../shared/global-singleton.js"; import { normalizeOptionalLowercaseString, normalizeOptionalString, @@ -15,11 +16,22 @@ const DEFAULT_INBOUND_DEDUPE_MAX = 5000; * message cannot bypass dedupe by entering through a different chunk copy. */ const INBOUND_DEDUPE_CACHE_KEY = Symbol.for("openclaw.inboundDedupeCache"); +const INBOUND_DEDUPE_INFLIGHT_KEY = Symbol.for("openclaw.inboundDedupeInflight"); const inboundDedupeCache: DedupeCache = resolveGlobalDedupeCache(INBOUND_DEDUPE_CACHE_KEY, { ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS, maxSize: DEFAULT_INBOUND_DEDUPE_MAX, }); +const inboundDedupeInFlight = resolveGlobalSingleton( + INBOUND_DEDUPE_INFLIGHT_KEY, + () => new Set(), +); + +export type InboundDedupeClaimResult = + | { status: "invalid" } + | { status: "duplicate"; key: string } + | { status: "inflight"; key: string } + | { status: "claimed"; key: string }; const resolveInboundPeerId = (ctx: MsgContext) => ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey; @@ -79,6 +91,42 @@ export function shouldSkipDuplicateInbound( return skipped; } +export function claimInboundDedupe( + ctx: MsgContext, + opts?: { cache?: DedupeCache; now?: number; inFlight?: Set }, +): InboundDedupeClaimResult { + const key = buildInboundDedupeKey(ctx); + if (!key) { + return { status: "invalid" }; + } + const cache = opts?.cache ?? inboundDedupeCache; + if (cache.peek(key, opts?.now)) { + return { status: "duplicate", key }; + } + const inFlight = opts?.inFlight ?? inboundDedupeInFlight; + if (inFlight.has(key)) { + return { status: "inflight", key }; + } + inFlight.add(key); + return { status: "claimed", key }; +} + +export function commitInboundDedupe( + key: string, + opts?: { cache?: DedupeCache; now?: number; inFlight?: Set }, +): void { + const cache = opts?.cache ?? inboundDedupeCache; + cache.check(key, opts?.now); + const inFlight = opts?.inFlight ?? inboundDedupeInFlight; + inFlight.delete(key); +} + +export function releaseInboundDedupe(key: string, opts?: { inFlight?: Set }): void { + const inFlight = opts?.inFlight ?? inboundDedupeInFlight; + inFlight.delete(key); +} + export function resetInboundDedupe(): void { inboundDedupeCache.clear(); + inboundDedupeInFlight.clear(); }