From a08b65a90a454fbfe2ea4025f5bcdab08640d983 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20Cuevas?= Date: Sun, 26 Apr 2026 18:44:30 -0400 Subject: [PATCH] fix(telegram): send fresh finals for stale previews (#72038) * fix(telegram): send fresh finals for stale previews * test(telegram): cover stale preview send fallback * fix(telegram): keep stale archived preview fallback * fix(telegram): clear stale active previews * fix(telegram): reset preview state after fresh finals --- CHANGELOG.md | 1 + docs/channels/telegram.md | 4 +- docs/concepts/streaming.md | 1 + .../telegram/src/bot-message-dispatch.ts | 2 + .../telegram/src/draft-stream.test-helpers.ts | 11 ++ extensions/telegram/src/draft-stream.test.ts | 40 ++++++ extensions/telegram/src/draft-stream.ts | 11 ++ .../src/lane-delivery-text-deliverer.ts | 48 +++++++ extensions/telegram/src/lane-delivery.test.ts | 126 +++++++++++++++++- 9 files changed, 236 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 81e1a04fcb9..b2184f65875 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -47,6 +47,7 @@ Docs: https://docs.openclaw.ai - Onboarding/models: keep skip-auth and provider-scoped model picker prompts off the full global model catalog path, and cache provider catalog hook resolution so setup no longer stalls after auth on large plugin registries. Thanks @shakkernerd. - Gateway/Bonjour: suppress known @homebridge/ciao cancellation and network assertion failures through scoped process handlers so malformed mDNS packets or restricted VPS networking disable/restart Bonjour instead of crashing the gateway. Fixes #67578. Thanks @zenassist26-create. - Discord: keep late clicks on already-resolved exec approval buttons quiet when elevated mode auto-resolved the request, while still surfacing real approval submission failures. Fixes #66906. Thanks @rlerikse. +- Telegram: send a fresh final message for long-lived preview-streamed replies so the visible Telegram timestamp reflects completion time instead of the preview creation time. Thanks @rubencu. ## 2026.4.25 diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index 065ce72d8ec..13564b36db1 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -298,8 +298,8 @@ curl "https://api.telegram.org/bot/getUpdates" For text-only replies: - - DM: OpenClaw keeps the same preview message and performs a final edit in place (no second message) - - group/topic: OpenClaw keeps the same preview message and performs a final edit in place (no second message) + - short DM/group/topic previews: OpenClaw keeps the same preview message and performs a final edit in place + - previews older than about one minute: OpenClaw sends the completed reply as a fresh final message and then cleans up the preview, so Telegram's visible timestamp reflects completion time instead of the preview creation time For complex replies (for example media payloads), OpenClaw falls back to normal final delivery and then cleans up the preview message. diff --git a/docs/concepts/streaming.md b/docs/concepts/streaming.md index 91c9fe65b8c..f7db2cc0de0 100644 --- a/docs/concepts/streaming.md +++ b/docs/concepts/streaming.md @@ -152,6 +152,7 @@ Legacy key migration: Telegram: - Uses `sendMessage` + `editMessageText` preview updates across DMs and group/topics. +- Sends a fresh final message instead of editing in place when a preview has been visible for about one minute, then cleans up the preview so Telegram's timestamp reflects reply completion. - Preview streaming is skipped when Telegram block streaming is explicitly enabled (to avoid double-streaming). - `/reasoning stream` can write reasoning to preview. diff --git a/extensions/telegram/src/bot-message-dispatch.ts b/extensions/telegram/src/bot-message-dispatch.ts index 44c2d95e858..25be7ba3ff4 100644 --- a/extensions/telegram/src/bot-message-dispatch.ts +++ b/extensions/telegram/src/bot-message-dispatch.ts @@ -433,6 +433,7 @@ export const dispatchTelegramMessage = async ({ archivedAnswerPreviews.push({ messageId: preview.messageId, textSnapshot: preview.textSnapshot, + visibleSinceMs: preview.visibleSinceMs, deleteIfUnused: true, }); } @@ -539,6 +540,7 @@ export const dispatchTelegramMessage = async ({ archivedAnswerPreviews.push({ messageId: previewMessageId, textSnapshot: answerLane.lastPartialText, + visibleSinceMs: answerLane.stream?.visibleSinceMs?.(), deleteIfUnused: false, }); } diff --git a/extensions/telegram/src/draft-stream.test-helpers.ts b/extensions/telegram/src/draft-stream.test-helpers.ts index 428b296efc9..9ef026fa2ee 100644 --- a/extensions/telegram/src/draft-stream.test-helpers.ts +++ b/extensions/telegram/src/draft-stream.test-helpers.ts @@ -6,6 +6,7 @@ export type TestDraftStream = { update: ReturnType void>>; flush: ReturnType Promise>>; messageId: ReturnType number | undefined>>; + visibleSinceMs: ReturnType number | undefined>>; previewMode: ReturnType DraftPreviewMode>>; previewRevision: ReturnType number>>; lastDeliveredText: ReturnType string>>; @@ -25,8 +26,10 @@ export function createTestDraftStream(params?: { onStop?: () => void | Promise; onDiscard?: () => void | Promise; clearMessageIdOnForceNew?: boolean; + visibleSinceMs?: number; }): TestDraftStream { let messageId = params?.messageId; + let visibleSinceMs = params?.visibleSinceMs; let previewRevision = 0; let lastDeliveredText = ""; return { @@ -37,6 +40,7 @@ export function createTestDraftStream(params?: { }), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockImplementation(() => messageId), + visibleSinceMs: vi.fn().mockImplementation(() => visibleSinceMs), previewMode: vi.fn().mockReturnValue(params?.previewMode ?? "message"), previewRevision: vi.fn().mockImplementation(() => previewRevision), lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), @@ -52,16 +56,19 @@ export function createTestDraftStream(params?: { if (params?.clearMessageIdOnForceNew) { messageId = undefined; } + visibleSinceMs = undefined; }), sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { messageId = value; + visibleSinceMs = value == null ? undefined : Date.now(); }, }; } export function createSequencedTestDraftStream(startMessageId = 1001): TestDraftStream { let activeMessageId: number | undefined; + let visibleSinceMs: number | undefined; let nextMessageId = startMessageId; let previewRevision = 0; let lastDeliveredText = ""; @@ -69,12 +76,14 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft update: vi.fn().mockImplementation((text: string) => { if (activeMessageId == null) { activeMessageId = nextMessageId++; + visibleSinceMs = Date.now(); } previewRevision += 1; lastDeliveredText = text.trimEnd(); }), flush: vi.fn().mockResolvedValue(undefined), messageId: vi.fn().mockImplementation(() => activeMessageId), + visibleSinceMs: vi.fn().mockImplementation(() => visibleSinceMs), previewMode: vi.fn().mockReturnValue("message"), previewRevision: vi.fn().mockImplementation(() => previewRevision), lastDeliveredText: vi.fn().mockImplementation(() => lastDeliveredText), @@ -84,10 +93,12 @@ export function createSequencedTestDraftStream(startMessageId = 1001): TestDraft materialize: vi.fn().mockImplementation(async () => activeMessageId), forceNewMessage: vi.fn().mockImplementation(() => { activeMessageId = undefined; + visibleSinceMs = undefined; }), sendMayHaveLanded: vi.fn().mockReturnValue(false), setMessageId: (value: number | undefined) => { activeMessageId = value; + visibleSinceMs = value == null ? undefined : Date.now(); }, }; } diff --git a/extensions/telegram/src/draft-stream.test.ts b/extensions/telegram/src/draft-stream.test.ts index 64d7245fe4e..cd82809cffe 100644 --- a/extensions/telegram/src/draft-stream.test.ts +++ b/extensions/telegram/src/draft-stream.test.ts @@ -161,6 +161,28 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessageDraft).not.toHaveBeenCalled(); }); + it("tracks when a message preview first became visible", async () => { + vi.useFakeTimers(); + try { + vi.setSystemTime(new Date("2026-04-26T01:00:00.000Z")); + const api = createMockDraftApi(); + const stream = createDraftStream(api, { previewTransport: "message" }); + + stream.update("Hello"); + await stream.flush(); + + expect(stream.visibleSinceMs?.()).toBe(Date.parse("2026-04-26T01:00:00.000Z")); + + vi.setSystemTime(new Date("2026-04-26T01:01:00.000Z")); + stream.update("Hello again"); + await stream.flush(); + + expect(stream.visibleSinceMs?.()).toBe(Date.parse("2026-04-26T01:00:00.000Z")); + } finally { + vi.useRealTimers(); + } + }); + it("falls back to message transport when sendMessageDraft is unavailable", async () => { const api = createMockDraftApi(); delete (api as { sendMessageDraft?: unknown }).sendMessageDraft; @@ -436,6 +458,23 @@ describe("createTelegramDraftStream", () => { expect(api.sendMessage).toHaveBeenLastCalledWith(123, "After thinking", undefined); }); + it("creates new message after cleanup and forceNewMessage", async () => { + const { api, stream } = createForceNewMessageHarness(); + + stream.update("Stale preview"); + await stream.flush(); + + await stream.clear(); + expect(api.deleteMessage).toHaveBeenCalledWith(123, 17); + + stream.forceNewMessage(); + stream.update("Next preview"); + await stream.flush(); + + expect(api.sendMessage).toHaveBeenCalledTimes(2); + expect(api.sendMessage).toHaveBeenLastCalledWith(123, "Next preview", undefined); + }); + it("sends first update immediately after forceNewMessage within throttle window", async () => { vi.useFakeTimers(); try { @@ -487,6 +526,7 @@ describe("createTelegramDraftStream", () => { messageId: 17, textSnapshot: "Message A partial", parseMode: undefined, + visibleSinceMs: expect.any(Number), }); expect(api.sendMessage).toHaveBeenCalledTimes(2); expect(api.sendMessage).toHaveBeenNthCalledWith(2, 123, "Message B partial", undefined); diff --git a/extensions/telegram/src/draft-stream.ts b/extensions/telegram/src/draft-stream.ts index a2f88aae216..802442f74ba 100644 --- a/extensions/telegram/src/draft-stream.ts +++ b/extensions/telegram/src/draft-stream.ts @@ -94,6 +94,7 @@ export type TelegramDraftStream = { update: (text: string) => void; flush: () => Promise; messageId: () => number | undefined; + visibleSinceMs?: () => number | undefined; previewMode?: () => "message" | "draft"; previewRevision?: () => number; lastDeliveredText?: () => string; @@ -118,6 +119,7 @@ type SupersededTelegramPreview = { messageId: number; textSnapshot: string; parseMode?: "HTML"; + visibleSinceMs?: number; }; export function createTelegramDraftStream(params: { @@ -174,6 +176,7 @@ export function createTelegramDraftStream(params: { const streamState = { stopped: false, final: false }; let messageSendAttempted = false; let streamMessageId: number | undefined; + let streamVisibleSinceMs: number | undefined; let streamDraftId = usesDraftTransport ? allocateTelegramDraftId() : undefined; let previewTransport: "message" | "draft" = usesDraftTransport ? "draft" : "message"; let lastSentText = ""; @@ -226,6 +229,7 @@ export function createTelegramDraftStream(params: { sendGeneration, }: PreviewSendParams): Promise => { if (typeof streamMessageId === "number") { + streamVisibleSinceMs ??= Date.now(); if (renderedParseMode) { await params.api.editMessageText(chatId, streamMessageId, renderedText, { parse_mode: renderedParseMode, @@ -257,15 +261,18 @@ export function createTelegramDraftStream(params: { return false; } const normalizedMessageId = Math.trunc(sentMessageId); + const visibleSinceMs = Date.now(); if (sendGeneration !== generation) { params.onSupersededPreview?.({ messageId: normalizedMessageId, textSnapshot: renderedText, parseMode: renderedParseMode, + visibleSinceMs, }); return true; } streamMessageId = normalizedMessageId; + streamVisibleSinceMs = visibleSinceMs; return true; }; const sendDraftTransportPreview = async ({ @@ -397,10 +404,12 @@ export function createTelegramDraftStream(params: { }; const forceNewMessage = () => { + streamState.stopped = false; streamState.final = false; generation += 1; messageSendAttempted = false; streamMessageId = undefined; + streamVisibleSinceMs = undefined; if (previewTransport === "draft") { streamDraftId = allocateTelegramDraftId(); } @@ -430,6 +439,7 @@ export function createTelegramDraftStream(params: { const sentId = sent?.message_id; if (typeof sentId === "number" && Number.isFinite(sentId)) { streamMessageId = Math.trunc(sentId); + streamVisibleSinceMs = Date.now(); if (resolvedDraftApi != null && streamDraftId != null) { const clearDraftId = streamDraftId; const clearThreadParams = @@ -454,6 +464,7 @@ export function createTelegramDraftStream(params: { update, flush: loop.flush, messageId: () => streamMessageId, + visibleSinceMs: () => streamVisibleSinceMs, previewMode: () => previewTransport, previewRevision: () => previewRevision, lastDeliveredText: () => lastDeliveredText, diff --git a/extensions/telegram/src/lane-delivery-text-deliverer.ts b/extensions/telegram/src/lane-delivery-text-deliverer.ts index 72ca2d51edc..ae1d83c065f 100644 --- a/extensions/telegram/src/lane-delivery-text-deliverer.ts +++ b/extensions/telegram/src/lane-delivery-text-deliverer.ts @@ -12,6 +12,7 @@ const MESSAGE_NOT_MODIFIED_RE = /400:\s*Bad Request:\s*message is not modified|MESSAGE_NOT_MODIFIED/i; const MESSAGE_NOT_FOUND_RE = /400:\s*Bad Request:\s*message to edit not found|MESSAGE_ID_INVALID|message can't be edited/i; +const LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS = 60_000; function extractErrorText(err: unknown): string { return typeof err === "string" @@ -55,6 +56,7 @@ export type DraftLaneState = { export type ArchivedPreview = { messageId: number; textSnapshot: string; + visibleSinceMs?: number; // Boundary-finalized previews should remain visible even if no matching // final edit arrives; superseded previews can be safely deleted. deleteIfUnused?: boolean; @@ -92,6 +94,7 @@ type CreateLaneTextDelivererParams = { deletePreviewMessage: (messageId: number) => Promise; log: (message: string) => void; markDelivered: () => void; + now?: () => number; }; type DeliverLaneTextParams = { @@ -169,6 +172,14 @@ function shouldSkipRegressivePreviewUpdate(args: { ); } +function isLongLivedPreview(visibleSinceMs: number | undefined, nowMs: number): boolean { + return ( + typeof visibleSinceMs === "number" && + Number.isFinite(visibleSinceMs) && + nowMs - visibleSinceMs >= LONG_LIVED_PREVIEW_FRESH_FINAL_AFTER_MS + ); +} + function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTargetResolution { const lanePreviewMessageId = params.lane.stream?.messageId(); const previewMessageId = @@ -187,11 +198,27 @@ function resolvePreviewTarget(params: ResolvePreviewTargetParams): PreviewTarget export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { const getLanePreviewText = (lane: DraftLaneState) => lane.lastPartialText; + const readNow = () => params.now?.() ?? Date.now(); const markActivePreviewComplete = (laneName: LaneName) => { params.activePreviewLifecycleByLane[laneName] = "complete"; params.retainPreviewOnCleanupByLane[laneName] = true; }; const isDraftPreviewLane = (lane: DraftLaneState) => lane.stream?.previewMode?.() === "draft"; + const isMessagePreviewLane = (lane: DraftLaneState) => !isDraftPreviewLane(lane); + const shouldUseFreshFinalForLane = (lane: DraftLaneState) => + isMessagePreviewLane(lane) && isLongLivedPreview(lane.stream?.visibleSinceMs?.(), readNow()); + const shouldUseFreshFinalForPreview = (lane: DraftLaneState, visibleSinceMs?: number) => + isMessagePreviewLane(lane) && isLongLivedPreview(visibleSinceMs, readNow()); + const clearActivePreviewAfterFreshFinal = async (lane: DraftLaneState, laneName: LaneName) => { + try { + await lane.stream?.clear(); + } catch (err) { + params.log(`telegram: ${laneName} fresh final preview cleanup failed: ${String(err)}`); + } + lane.lastPartialText = ""; + lane.hasStreamedMessage = false; + lane.stream?.forceNewMessage(); + }; const canMaterializeDraftFinal = ( lane: DraftLaneState, previewButtons?: TelegramInlineButtons, @@ -444,6 +471,19 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { if (!archivedPreview) { return undefined; } + if (canEditViaPreview && shouldUseFreshFinalForPreview(lane, archivedPreview.visibleSinceMs)) { + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + if (delivered) { + try { + await params.deletePreviewMessage(archivedPreview.messageId); + } catch (err) { + params.log( + `telegram: archived answer preview cleanup failed (${archivedPreview.messageId}): ${String(err)}`, + ); + } + return result("sent"); + } + } if (canEditViaPreview) { const finalized = await tryUpdatePreviewForLane({ lane, @@ -551,6 +591,14 @@ export function createLaneTextDeliverer(params: CreateLaneTextDelivererParams) { }); } } + if (shouldUseFreshFinalForLane(lane)) { + await params.stopDraftLane(lane); + const delivered = await params.sendPayload(params.applyTextToPayload(payload, text)); + if (delivered) { + await clearActivePreviewAfterFreshFinal(lane, laneName); + return result("sent"); + } + } const previewMessageId = lane.stream?.messageId(); const finalized = await tryUpdatePreviewForLane({ lane, diff --git a/extensions/telegram/src/lane-delivery.test.ts b/extensions/telegram/src/lane-delivery.test.ts index 36d62786e9a..174c73c9ddd 100644 --- a/extensions/telegram/src/lane-delivery.test.ts +++ b/extensions/telegram/src/lane-delivery.test.ts @@ -2,6 +2,7 @@ import type { ReplyPayload } from "openclaw/plugin-sdk/reply-runtime"; import { describe, expect, it, vi } from "vitest"; import { createTestDraftStream } from "./draft-stream.test-helpers.js"; import { + type ArchivedPreview, createLaneTextDeliverer, type DraftLaneState, type LaneDeliveryResult, @@ -17,9 +18,15 @@ function createHarness(params?: { answerStream?: DraftLaneState["stream"]; answerHasStreamedMessage?: boolean; answerLastPartialText?: string; + answerPreviewVisibleSinceMs?: number; + nowMs?: number; }) { const answer = - params?.answerStream ?? createTestDraftStream({ messageId: params?.answerMessageId }); + params?.answerStream ?? + createTestDraftStream({ + messageId: params?.answerMessageId, + visibleSinceMs: params?.answerPreviewVisibleSinceMs, + }); const reasoning = createTestDraftStream(); const lanes: Record = { answer: { @@ -51,11 +58,7 @@ function createHarness(params?: { const markDelivered = vi.fn(); const activePreviewLifecycleByLane = { answer: "transient", reasoning: "transient" } as const; const retainPreviewOnCleanupByLane = { answer: false, reasoning: false } as const; - const archivedAnswerPreviews: Array<{ - messageId: number; - textSnapshot: string; - deleteIfUnused?: boolean; - }> = []; + const archivedAnswerPreviews: ArchivedPreview[] = []; const deliverLaneText = createLaneTextDeliverer({ lanes, @@ -71,6 +74,7 @@ function createHarness(params?: { deletePreviewMessage, log, markDelivered, + now: params?.nowMs != null ? () => params.nowMs! : undefined, }); return { @@ -347,6 +351,116 @@ describe("createLaneTextDeliverer", () => { expect(harness.log).toHaveBeenCalledWith(expect.stringContaining("preview final too long")); }); + it("sends a fresh final when a message preview is long lived", async () => { + const visibleSinceMs = 10_000; + const harness = createHarness({ + answerMessageId: 999, + answerHasStreamedMessage: true, + answerLastPartialText: "Working...", + answerPreviewVisibleSinceMs: visibleSinceMs, + nowMs: visibleSinceMs + 60_000, + }); + + const result = await deliverFinalAnswer(harness, HELLO_FINAL); + + expect(result.kind).toBe("sent"); + expect(harness.stopDraftLane).toHaveBeenCalledTimes(1); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: HELLO_FINAL }), + ); + expect(harness.editPreview).not.toHaveBeenCalled(); + expect(harness.answer.stream?.clear).toHaveBeenCalledTimes(1); + expect(harness.answer.stream?.forceNewMessage).toHaveBeenCalledTimes(1); + expect(harness.lanes.answer.hasStreamedMessage).toBe(false); + expect(harness.lanes.answer.lastPartialText).toBe(""); + expect(harness.markDelivered).not.toHaveBeenCalled(); + }); + + it("falls back to editing a long-lived preview when fresh final send returns false", async () => { + const visibleSinceMs = 10_000; + const harness = createHarness({ + answerMessageId: 999, + answerHasStreamedMessage: true, + answerLastPartialText: "Working...", + answerPreviewVisibleSinceMs: visibleSinceMs, + nowMs: visibleSinceMs + 60_000, + }); + harness.sendPayload.mockResolvedValueOnce(false); + + const result = await deliverFinalAnswer(harness, HELLO_FINAL); + + expect(expectPreviewFinalized(result)).toEqual({ + content: HELLO_FINAL, + messageId: 999, + }); + expect(harness.stopDraftLane).toHaveBeenCalledTimes(2); + expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.editPreview).toHaveBeenCalledWith( + expect.objectContaining({ + messageId: 999, + text: HELLO_FINAL, + }), + ); + expect(harness.answer.stream?.clear).not.toHaveBeenCalled(); + expect(harness.markDelivered).toHaveBeenCalledTimes(1); + }); + + it("sends a fresh final for stale archived previews", async () => { + const visibleSinceMs = 10_000; + const harness = createHarness({ + answerMessageId: 1001, + answerPreviewVisibleSinceMs: visibleSinceMs, + nowMs: visibleSinceMs + 60_000, + }); + harness.archivedAnswerPreviews.push({ + messageId: 222, + textSnapshot: "Working...", + visibleSinceMs, + deleteIfUnused: true, + }); + + const result = await deliverFinalAnswer(harness, HELLO_FINAL); + + expect(result.kind).toBe("sent"); + expect(harness.sendPayload).toHaveBeenCalledWith( + expect.objectContaining({ text: HELLO_FINAL }), + ); + expect(harness.editPreview).not.toHaveBeenCalled(); + expect(harness.deletePreviewMessage).toHaveBeenCalledWith(222); + }); + + it("falls back to editing a stale archived preview when fresh final send returns false", async () => { + const visibleSinceMs = 10_000; + const harness = createHarness({ + answerMessageId: 1001, + answerPreviewVisibleSinceMs: visibleSinceMs, + nowMs: visibleSinceMs + 60_000, + }); + harness.archivedAnswerPreviews.push({ + messageId: 222, + textSnapshot: "Working...", + visibleSinceMs, + deleteIfUnused: true, + }); + harness.sendPayload.mockResolvedValueOnce(false); + + const result = await deliverFinalAnswer(harness, HELLO_FINAL); + + expect(expectPreviewFinalized(result)).toEqual({ + content: HELLO_FINAL, + messageId: 222, + }); + expect(harness.sendPayload).toHaveBeenCalledTimes(1); + expect(harness.editPreview).toHaveBeenCalledWith( + expect.objectContaining({ + messageId: 222, + text: HELLO_FINAL, + }), + ); + expect(harness.deletePreviewMessage).not.toHaveBeenCalled(); + expect(harness.markDelivered).toHaveBeenCalledTimes(1); + }); + it("materializes DM draft streaming final even when text is unchanged", async () => { const answerStream = createTestDraftStream({ previewMode: "draft", messageId: 321 }); answerStream.materialize.mockResolvedValue(321);