mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-20 16:23:55 +02:00
fix(feishu): keep comment replay closed after generic failures
This commit is contained in:
@@ -662,6 +662,11 @@ function registerEventHandlers(
|
||||
if (syntheticMessageId) {
|
||||
await recordProcessedFeishuMessage(syntheticMessageId, accountId, log);
|
||||
}
|
||||
} catch (err) {
|
||||
if (syntheticMessageId && !isFeishuRetryableSyntheticEventError(err)) {
|
||||
await recordProcessedFeishuMessage(syntheticMessageId, accountId, log);
|
||||
}
|
||||
throw err;
|
||||
} finally {
|
||||
if (syntheticMessageId) {
|
||||
releaseFeishuMessageProcessing(syntheticMessageId, accountId);
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
resolveInboundDebounceMs,
|
||||
} from "openclaw/plugin-sdk/reply-runtime";
|
||||
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
|
||||
import { createNonExitingTypedRuntimeEnv } from "../../../test/helpers/plugins/runtime-env.js";
|
||||
import { createNonExitingRuntimeEnv } from "../../../test/helpers/plugins/runtime-env.js";
|
||||
import type { ClawdbotConfig, PluginRuntime, RuntimeEnv } from "../runtime-api.js";
|
||||
import * as dedup from "./dedup.js";
|
||||
import { monitorSingleAccount } from "./monitor.account.js";
|
||||
@@ -23,6 +23,7 @@ const monitorWebhookMock = vi.hoisted(() => vi.fn(async () => {}));
|
||||
const createFeishuThreadBindingManagerMock = vi.hoisted(() => vi.fn(() => ({ stop: vi.fn() })));
|
||||
|
||||
let handlers: Record<string, (data: unknown) => Promise<void>> = {};
|
||||
let lastRuntime: ReturnType<typeof createNonExitingRuntimeEnv> | null = null;
|
||||
const TEST_DOC_TOKEN = "ZsJfdxrBFo0RwuxteOLc1Ekvneb";
|
||||
const TEST_WIKI_TOKEN = "OtYpd5pKOoMeQzxrzkocv9KIn4H";
|
||||
|
||||
@@ -253,11 +254,12 @@ async function setupCommentMonitorHandler(): Promise<(data: unknown) => Promise<
|
||||
handlers = registered;
|
||||
});
|
||||
createEventDispatcherMock.mockReturnValue({ register });
|
||||
lastRuntime = createNonExitingRuntimeEnv();
|
||||
|
||||
await monitorSingleAccount({
|
||||
cfg: buildMonitorConfig(),
|
||||
account: buildMonitorAccount(),
|
||||
runtime: createNonExitingTypedRuntimeEnv<RuntimeEnv>(),
|
||||
runtime: lastRuntime as RuntimeEnv,
|
||||
botOpenIdSource: {
|
||||
kind: "prefetched",
|
||||
botOpenId: "ou_bot",
|
||||
@@ -870,6 +872,7 @@ describe("resolveDriveCommentEventTurn", () => {
|
||||
describe("drive.notice.comment_add_v1 monitor handler", () => {
|
||||
beforeEach(() => {
|
||||
handlers = {};
|
||||
lastRuntime = null;
|
||||
handleFeishuCommentEventMock.mockClear();
|
||||
createEventDispatcherMock.mockReset();
|
||||
createFeishuClientMock.mockReset().mockReturnValue(makeOpenApiClient({}) as never);
|
||||
@@ -879,6 +882,7 @@ describe("drive.notice.comment_add_v1 monitor handler", () => {
|
||||
vi.spyOn(dedup, "claimUnprocessedFeishuMessage").mockResolvedValue("claimed");
|
||||
vi.spyOn(dedup, "tryBeginFeishuMessageProcessing").mockReturnValue(true);
|
||||
vi.spyOn(dedup, "recordProcessedFeishuMessage").mockResolvedValue(true);
|
||||
vi.spyOn(dedup, "releaseFeishuMessageProcessing").mockImplementation(() => {});
|
||||
vi.spyOn(dedup, "hasProcessedFeishuMessage").mockResolvedValue(false);
|
||||
setFeishuRuntime(createFeishuMonitorRuntime());
|
||||
});
|
||||
@@ -959,4 +963,50 @@ describe("drive.notice.comment_add_v1 monitor handler", () => {
|
||||
|
||||
expect(handleFeishuCommentEventMock).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("records generic comment-handler failures so replay stays closed", async () => {
|
||||
const onComment = await setupCommentMonitorHandler();
|
||||
handleFeishuCommentEventMock.mockRejectedValueOnce(new Error("post-send failure"));
|
||||
|
||||
await onComment(makeDriveCommentEvent());
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(dedup.recordProcessedFeishuMessage).toHaveBeenCalledWith(
|
||||
"drive-comment:10d9d60b990db39f96a4c2fd357fb877",
|
||||
"default",
|
||||
expect.any(Function),
|
||||
);
|
||||
expect(dedup.releaseFeishuMessageProcessing).toHaveBeenCalledWith(
|
||||
"drive-comment:10d9d60b990db39f96a4c2fd357fb877",
|
||||
"default",
|
||||
);
|
||||
expect(lastRuntime?.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining("error handling drive comment notice: Error: post-send failure"),
|
||||
);
|
||||
});
|
||||
});
|
||||
|
||||
it("releases comment replay without recording when failure is explicitly retryable", async () => {
|
||||
const onComment = await setupCommentMonitorHandler();
|
||||
handleFeishuCommentEventMock.mockRejectedValueOnce(
|
||||
Object.assign(new Error("retry me"), {
|
||||
name: "FeishuRetryableSyntheticEventError",
|
||||
}),
|
||||
);
|
||||
|
||||
await onComment(makeDriveCommentEvent());
|
||||
|
||||
await vi.waitFor(() => {
|
||||
expect(dedup.recordProcessedFeishuMessage).not.toHaveBeenCalled();
|
||||
expect(dedup.releaseFeishuMessageProcessing).toHaveBeenCalledWith(
|
||||
"drive-comment:10d9d60b990db39f96a4c2fd357fb877",
|
||||
"default",
|
||||
);
|
||||
expect(lastRuntime?.error).toHaveBeenCalledWith(
|
||||
expect.stringContaining(
|
||||
"error handling drive comment notice: FeishuRetryableSyntheticEventError: retry me",
|
||||
),
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -61,4 +61,32 @@ describe("createSequentialQueue", () => {
|
||||
expect(order).toContain("chat-1:end");
|
||||
expect(order).toContain("btw:end");
|
||||
});
|
||||
|
||||
it("does not leak unhandled rejections when a queued task fails", async () => {
|
||||
const enqueue = createSequentialQueue();
|
||||
const unhandled: unknown[] = [];
|
||||
const onUnhandledRejection = (reason: unknown) => {
|
||||
unhandled.push(reason);
|
||||
};
|
||||
process.on("unhandledRejection", onUnhandledRejection);
|
||||
|
||||
try {
|
||||
await expect(
|
||||
enqueue("feishu:default:chat-1", async () => {
|
||||
throw new Error("boom");
|
||||
}),
|
||||
).rejects.toThrow("boom");
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
expect(unhandled).toEqual([]);
|
||||
|
||||
await expect(
|
||||
enqueue("feishu:default:chat-1", async () => {
|
||||
return;
|
||||
}),
|
||||
).resolves.toBeUndefined();
|
||||
} finally {
|
||||
process.off("unhandledRejection", onUnhandledRejection);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -5,11 +5,12 @@ export function createSequentialQueue() {
|
||||
const previous = queues.get(key) ?? Promise.resolve();
|
||||
const next = previous.then(task, task);
|
||||
queues.set(key, next);
|
||||
void next.finally(() => {
|
||||
const cleanup = () => {
|
||||
if (queues.get(key) === next) {
|
||||
queues.delete(key);
|
||||
}
|
||||
});
|
||||
};
|
||||
next.then(cleanup, cleanup);
|
||||
return next;
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user