fix(auto-reply): release inbound dedupe after dispatch errors

This commit is contained in:
Vincent Koc
2026-04-13 16:34:28 +01:00
parent 66ea85f9d4
commit 7c91d0dbc9
4 changed files with 137 additions and 2 deletions

View File

@@ -2692,6 +2692,54 @@ describe("dispatchReplyFromConfig", () => {
);
});
it("releases inbound dedupe when dispatch fails before completion", async () => {
setNoAbort();
const cfg = { diagnostics: { enabled: true } } as OpenClawConfig;
const ctx = buildTestCtx({
Provider: "whatsapp",
OriginatingChannel: "whatsapp",
OriginatingTo: "whatsapp:+15555550124",
To: "whatsapp:+15555550124",
AccountId: "default",
MessageSid: "msg-dup-error",
SessionKey: "agent:main:whatsapp:direct:+15555550124",
CommandBody: "hello",
RawBody: "hello",
Body: "hello",
});
const replyResolver = vi
.fn<
(_ctx: MsgContext, _opts?: GetReplyOptions, _cfg?: OpenClawConfig) => Promise<ReplyPayload>
>()
.mockRejectedValueOnce(new Error("dispatch failed"))
.mockResolvedValueOnce({ text: "retry succeeds" });
await expect(
dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
}),
).rejects.toThrow("dispatch failed");
await dispatchReplyFromConfig({
ctx,
cfg,
dispatcher: createDispatcher(),
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(2);
expect(diagnosticMocks.logMessageProcessed).toHaveBeenCalledWith(
expect.objectContaining({
channel: "whatsapp",
outcome: "error",
error: "Error: dispatch failed",
}),
);
});
it("passes configOverride to replyResolver when provided", async () => {
setNoAbort();
const cfg = emptyConfig;

View File

@@ -63,7 +63,7 @@ import type {
DispatchFromConfigParams,
DispatchFromConfigResult,
} from "./dispatch-from-config.types.js";
import { shouldSkipDuplicateInbound } from "./inbound-dedupe.js";
import { claimInboundDedupe, commitInboundDedupe, releaseInboundDedupe } from "./inbound-dedupe.js";
import { resolveReplyRoutingDecision } from "./routing-policy.js";
import { resolveRunTypingPolicy } from "./typing-policy.js";
@@ -255,7 +255,8 @@ export async function dispatchReplyFromConfig(
});
};
if (shouldSkipDuplicateInbound(ctx)) {
const inboundDedupeClaim = claimInboundDedupe(ctx);
if (inboundDedupeClaim.status === "duplicate" || inboundDedupeClaim.status === "inflight") {
recordProcessed("skipped", { reason: "duplicate" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
@@ -1032,6 +1033,9 @@ export async function dispatchReplyFromConfig(
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
if (inboundDedupeClaim.status === "claimed") {
commitInboundDedupe(inboundDedupeClaim.key);
}
recordProcessed(
"completed",
pluginFallbackReason ? { reason: pluginFallbackReason } : undefined,
@@ -1039,6 +1043,9 @@ export async function dispatchReplyFromConfig(
markIdle("message_completed");
return { queuedFinal, counts };
} catch (err) {
if (inboundDedupeClaim.status === "claimed") {
releaseInboundDedupe(inboundDedupeClaim.key);
}
recordProcessed("error", { error: String(err) });
markIdle("message_error");
throw err;

View File

@@ -40,4 +40,36 @@ describe("inbound dedupe", () => {
inboundB.resetInboundDedupe();
}
});
it("shares claim/release state across distinct module instances", async () => {
const inboundA = await importFreshModule<typeof import("./inbound-dedupe.js")>(
import.meta.url,
"./inbound-dedupe.js?scope=claim-a",
);
const inboundB = await importFreshModule<typeof import("./inbound-dedupe.js")>(
import.meta.url,
"./inbound-dedupe.js?scope=claim-b",
);
inboundA.resetInboundDedupe();
inboundB.resetInboundDedupe();
try {
const firstClaim = inboundA.claimInboundDedupe(sharedInboundContext);
expect(firstClaim).toMatchObject({ status: "claimed" });
expect(inboundB.claimInboundDedupe(sharedInboundContext)).toMatchObject({
status: "inflight",
});
if (firstClaim.status !== "claimed") {
throw new Error("expected claimed inbound dedupe result");
}
inboundB.releaseInboundDedupe(firstClaim.key);
expect(inboundA.claimInboundDedupe(sharedInboundContext)).toMatchObject({
status: "claimed",
});
} finally {
inboundA.resetInboundDedupe();
inboundB.resetInboundDedupe();
}
});
});

View File

@@ -1,6 +1,7 @@
import { logVerbose, shouldLogVerbose } from "../../globals.js";
import { resolveGlobalDedupeCache, type DedupeCache } from "../../infra/dedupe.js";
import { parseAgentSessionKey } from "../../sessions/session-key-utils.js";
import { resolveGlobalSingleton } from "../../shared/global-singleton.js";
import {
normalizeOptionalLowercaseString,
normalizeOptionalString,
@@ -15,11 +16,22 @@ const DEFAULT_INBOUND_DEDUPE_MAX = 5000;
* message cannot bypass dedupe by entering through a different chunk copy.
*/
const INBOUND_DEDUPE_CACHE_KEY = Symbol.for("openclaw.inboundDedupeCache");
const INBOUND_DEDUPE_INFLIGHT_KEY = Symbol.for("openclaw.inboundDedupeInflight");
const inboundDedupeCache: DedupeCache = resolveGlobalDedupeCache(INBOUND_DEDUPE_CACHE_KEY, {
ttlMs: DEFAULT_INBOUND_DEDUPE_TTL_MS,
maxSize: DEFAULT_INBOUND_DEDUPE_MAX,
});
const inboundDedupeInFlight = resolveGlobalSingleton(
INBOUND_DEDUPE_INFLIGHT_KEY,
() => new Set<string>(),
);
export type InboundDedupeClaimResult =
| { status: "invalid" }
| { status: "duplicate"; key: string }
| { status: "inflight"; key: string }
| { status: "claimed"; key: string };
const resolveInboundPeerId = (ctx: MsgContext) =>
ctx.OriginatingTo ?? ctx.To ?? ctx.From ?? ctx.SessionKey;
@@ -79,6 +91,42 @@ export function shouldSkipDuplicateInbound(
return skipped;
}
export function claimInboundDedupe(
ctx: MsgContext,
opts?: { cache?: DedupeCache; now?: number; inFlight?: Set<string> },
): InboundDedupeClaimResult {
const key = buildInboundDedupeKey(ctx);
if (!key) {
return { status: "invalid" };
}
const cache = opts?.cache ?? inboundDedupeCache;
if (cache.peek(key, opts?.now)) {
return { status: "duplicate", key };
}
const inFlight = opts?.inFlight ?? inboundDedupeInFlight;
if (inFlight.has(key)) {
return { status: "inflight", key };
}
inFlight.add(key);
return { status: "claimed", key };
}
export function commitInboundDedupe(
key: string,
opts?: { cache?: DedupeCache; now?: number; inFlight?: Set<string> },
): void {
const cache = opts?.cache ?? inboundDedupeCache;
cache.check(key, opts?.now);
const inFlight = opts?.inFlight ?? inboundDedupeInFlight;
inFlight.delete(key);
}
export function releaseInboundDedupe(key: string, opts?: { inFlight?: Set<string> }): void {
const inFlight = opts?.inFlight ?? inboundDedupeInFlight;
inFlight.delete(key);
}
export function resetInboundDedupe(): void {
inboundDedupeCache.clear();
inboundDedupeInFlight.clear();
}