From 6d38bd476893efe2dcde83c87e14fbb30260dfb2 Mon Sep 17 00:00:00 2001 From: Vincent Koc Date: Mon, 13 Apr 2026 16:22:21 +0100 Subject: [PATCH] fix(feishu): keep comment replay closed after generic failures --- extensions/feishu/src/monitor.account.ts | 5 ++ extensions/feishu/src/monitor.comment.test.ts | 54 ++++++++++++++++++- .../feishu/src/sequential-queue.test.ts | 28 ++++++++++ extensions/feishu/src/sequential-queue.ts | 5 +- 4 files changed, 88 insertions(+), 4 deletions(-) diff --git a/extensions/feishu/src/monitor.account.ts b/extensions/feishu/src/monitor.account.ts index 45885460480..4c8f84c25da 100644 --- a/extensions/feishu/src/monitor.account.ts +++ b/extensions/feishu/src/monitor.account.ts @@ -662,6 +662,11 @@ function registerEventHandlers( if (syntheticMessageId) { await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); } + } catch (err) { + if (syntheticMessageId && !isFeishuRetryableSyntheticEventError(err)) { + await recordProcessedFeishuMessage(syntheticMessageId, accountId, log); + } + throw err; } finally { if (syntheticMessageId) { releaseFeishuMessageProcessing(syntheticMessageId, accountId); diff --git a/extensions/feishu/src/monitor.comment.test.ts b/extensions/feishu/src/monitor.comment.test.ts index 7815f65866f..605404d2353 100644 --- a/extensions/feishu/src/monitor.comment.test.ts +++ b/extensions/feishu/src/monitor.comment.test.ts @@ -4,7 +4,7 @@ import { resolveInboundDebounceMs, } from "openclaw/plugin-sdk/reply-runtime"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; -import { createNonExitingTypedRuntimeEnv } from "../../../test/helpers/plugins/runtime-env.js"; +import { createNonExitingRuntimeEnv } from "../../../test/helpers/plugins/runtime-env.js"; import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../runtime-api.js"; import * as dedup from "./dedup.js"; import { monitorSingleAccount } from "./monitor.account.js"; @@ -23,6 +23,7 @@ const monitorWebhookMock = vi.hoisted(() => vi.fn(async () => {})); const createFeishuThreadBindingManagerMock = vi.hoisted(() => vi.fn(() => ({ stop: vi.fn() }))); let handlers: Record Promise> = {}; +let lastRuntime: ReturnType | null = null; const TEST_DOC_TOKEN = "ZsJfdxrBFo0RwuxteOLc1Ekvneb"; const TEST_WIKI_TOKEN = "OtYpd5pKOoMeQzxrzkocv9KIn4H"; @@ -253,11 +254,12 @@ async function setupCommentMonitorHandler(): Promise<(data: unknown) => Promise< handlers = registered; }); createEventDispatcherMock.mockReturnValue({ register }); + lastRuntime = createNonExitingRuntimeEnv(); await monitorSingleAccount({ cfg: buildMonitorConfig(), account: buildMonitorAccount(), - runtime: createNonExitingTypedRuntimeEnv(), + runtime: lastRuntime as RuntimeEnv, botOpenIdSource: { kind: "prefetched", botOpenId: "ou_bot", @@ -870,6 +872,7 @@ describe("resolveDriveCommentEventTurn", () => { describe("drive.notice.comment_add_v1 monitor handler", () => { beforeEach(() => { handlers = {}; + lastRuntime = null; handleFeishuCommentEventMock.mockClear(); createEventDispatcherMock.mockReset(); createFeishuClientMock.mockReset().mockReturnValue(makeOpenApiClient({}) as never); @@ -879,6 +882,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => { vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("claimed"); vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true); vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true); + vi.spyOn(dedup, "releaseFeishuMessageProcessing").mockImplementation(() => {}); vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false); setFeishuRuntime(createFeishuMonitorRuntime()); }); @@ -959,4 +963,50 @@ describe("drive.notice.comment_add_v1 monitor handler", () => { expect(handleFeishuCommentEventMock).not.toHaveBeenCalled(); }); + + it("records generic comment-handler failures so replay stays closed", async () => { + const onComment = await setupCommentMonitorHandler(); + handleFeishuCommentEventMock.mockRejectedValueOnce(new Error("post-send failure")); + + await onComment(makeDriveCommentEvent()); + + await vi.waitFor(() => { + expect(dedup.recordProcessedFeishuMessage).toHaveBeenCalledWith( + "drive-comment:10d9d60b990db39f96a4c2fd357fb877", + "default", + expect.any(Function), + ); + expect(dedup.releaseFeishuMessageProcessing).toHaveBeenCalledWith( + "drive-comment:10d9d60b990db39f96a4c2fd357fb877", + "default", + ); + expect(lastRuntime?.error).toHaveBeenCalledWith( + expect.stringContaining("error handling drive comment notice: Error: post-send failure"), + ); + }); + }); + + it("releases comment replay without recording when failure is explicitly retryable", async () => { + const onComment = await setupCommentMonitorHandler(); + handleFeishuCommentEventMock.mockRejectedValueOnce( + Object.assign(new Error("retry me"), { + name: "FeishuRetryableSyntheticEventError", + }), + ); + + await onComment(makeDriveCommentEvent()); + + await vi.waitFor(() => { + expect(dedup.recordProcessedFeishuMessage).not.toHaveBeenCalled(); + expect(dedup.releaseFeishuMessageProcessing).toHaveBeenCalledWith( + "drive-comment:10d9d60b990db39f96a4c2fd357fb877", + "default", + ); + expect(lastRuntime?.error).toHaveBeenCalledWith( + expect.stringContaining( + "error handling drive comment notice: FeishuRetryableSyntheticEventError: retry me", + ), + ); + }); + }); }); diff --git a/extensions/feishu/src/sequential-queue.test.ts b/extensions/feishu/src/sequential-queue.test.ts index a3dfaec15f0..fa2cbaf2bd8 100644 --- a/extensions/feishu/src/sequential-queue.test.ts +++ b/extensions/feishu/src/sequential-queue.test.ts @@ -61,4 +61,32 @@ describe("createSequentialQueue", () => { expect(order).toContain("chat-1:end"); expect(order).toContain("btw:end"); }); + + it("does not leak unhandled rejections when a queued task fails", async () => { + const enqueue = createSequentialQueue(); + const unhandled: unknown[] = []; + const onUnhandledRejection = (reason: unknown) => { + unhandled.push(reason); + }; + process.on("unhandledRejection", onUnhandledRejection); + + try { + await expect( + enqueue("feishu:default:chat-1", async () => { + throw new Error("boom"); + }), + ).rejects.toThrow("boom"); + + await new Promise((resolve) => setTimeout(resolve, 0)); + expect(unhandled).toEqual([]); + + await expect( + enqueue("feishu:default:chat-1", async () => { + return; + }), + ).resolves.toBeUndefined(); + } finally { + process.off("unhandledRejection", onUnhandledRejection); + } + }); }); diff --git a/extensions/feishu/src/sequential-queue.ts b/extensions/feishu/src/sequential-queue.ts index 42ccadd200e..edaf2bf398c 100644 --- a/extensions/feishu/src/sequential-queue.ts +++ b/extensions/feishu/src/sequential-queue.ts @@ -5,11 +5,12 @@ export function createSequentialQueue() { const previous = queues.get(key) ?? Promise.resolve(); const next = previous.then(task, task); queues.set(key, next); - void next.finally(() => { + const cleanup = () => { if (queues.get(key) === next) { queues.delete(key); } - }); + }; + next.then(cleanup, cleanup); return next; }; }