From ad181b2361ba417acc7f295b0c199aec02588f2a Mon Sep 17 00:00:00 2001 From: Ayaan Zaidi Date: Mon, 13 Apr 2026 23:48:55 +0530 Subject: [PATCH] fix(telegram): persist topic-name cache --- .../telegram/src/bot-message-context.ts | 57 +++--- extensions/telegram/src/topic-name-cache.ts | 186 +++++++++++++++--- 2 files changed, 196 insertions(+), 47 deletions(-) diff --git a/extensions/telegram/src/bot-message-context.ts b/extensions/telegram/src/bot-message-context.ts index f8f6d166b33..5f7bcb16006 100644 --- a/extensions/telegram/src/bot-message-context.ts +++ b/extensions/telegram/src/bot-message-context.ts @@ -34,7 +34,7 @@ import { resolveTelegramReactionVariant, resolveTelegramStatusReactionEmojis, } from "./status-reaction-variants.js"; -import { getTopicName, updateTopicName } from "./topic-name-cache.js"; +import { getTopicName, resolveTopicNameCachePath, updateTopicName } from "./topic-name-cache.js"; export type { BuildTelegramMessageContextParams, @@ -149,40 +149,51 @@ export const buildTelegramMessageContext = async ({ const resolvedThreadId = threadSpec.scope === "forum" ? threadSpec.id : undefined; const replyThreadId = threadSpec.id; const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined; + const topicNameCachePath = resolveTopicNameCachePath( + sessionRuntime.resolveStorePath(cfg.session?.store, { agentId: account.accountId }), + ); let topicName: string | undefined; if (isForum && resolvedThreadId != null) { const ftCreated = msg.forum_topic_created; const ftEdited = msg.forum_topic_edited; const ftClosed = msg.forum_topic_closed; const ftReopened = msg.forum_topic_reopened; + const topicPatch = ftCreated?.name + ? { + name: ftCreated.name, + iconColor: ftCreated.icon_color, + iconCustomEmojiId: ftCreated.icon_custom_emoji_id, + closed: false, + } + : ftEdited?.name + ? { + name: ftEdited.name, + iconCustomEmojiId: ftEdited.icon_custom_emoji_id, + } + : ftClosed + ? { closed: true } + : ftReopened + ? { closed: false } + : undefined; - if (ftCreated?.name) { - updateTopicName(chatId, resolvedThreadId, { - name: ftCreated.name, - iconColor: ftCreated.icon_color, - iconCustomEmojiId: ftCreated.icon_custom_emoji_id, - closed: false, - }); - } else if (ftEdited?.name) { - updateTopicName(chatId, resolvedThreadId, { - name: ftEdited.name, - iconCustomEmojiId: ftEdited.icon_custom_emoji_id, - }); - } else if (ftClosed) { - updateTopicName(chatId, resolvedThreadId, { closed: true }); - } else if (ftReopened) { - updateTopicName(chatId, resolvedThreadId, { closed: false }); + if (topicPatch) { + updateTopicName(chatId, resolvedThreadId, topicPatch, topicNameCachePath); } - topicName = getTopicName(chatId, resolvedThreadId); + topicName = getTopicName(chatId, resolvedThreadId, topicNameCachePath); if (!topicName) { const replyFtCreated = msg.reply_to_message?.forum_topic_created; if (replyFtCreated?.name) { - updateTopicName(chatId, resolvedThreadId, { - name: replyFtCreated.name, - iconColor: replyFtCreated.icon_color, - iconCustomEmojiId: replyFtCreated.icon_custom_emoji_id, - }); + updateTopicName( + chatId, + resolvedThreadId, + { + name: replyFtCreated.name, + iconColor: replyFtCreated.icon_color, + iconCustomEmojiId: replyFtCreated.icon_custom_emoji_id, + }, + topicNameCachePath, + ); topicName = replyFtCreated.name; } } diff --git a/extensions/telegram/src/topic-name-cache.ts b/extensions/telegram/src/topic-name-cache.ts index dcadda49cb2..ec647fa4c7e 100644 --- a/extensions/telegram/src/topic-name-cache.ts +++ b/extensions/telegram/src/topic-name-cache.ts @@ -1,4 +1,10 @@ +import fs from "node:fs"; +import path from "node:path"; +import { logVerbose } from "openclaw/plugin-sdk/runtime-env"; + const MAX_ENTRIES = 2_048; +const TOPIC_NAME_CACHE_STATE_KEY = Symbol.for("openclaw.telegramTopicNameCacheState"); +const DEFAULT_TOPIC_NAME_CACHE_KEY = "__default__"; export type TopicEntry = { name: string; @@ -8,27 +14,151 @@ export type TopicEntry = { updatedAt: number; }; -const cache = new Map(); +type TopicNameStore = Map; + +type TopicNameStoreState = { + lastUpdatedAt: number; + store: TopicNameStore; +}; + +type TopicNameCacheState = { + stores: Map; +}; + +function createTopicNameStore(): TopicNameStore { + return new Map(); +} + +function createTopicNameStoreState(): TopicNameStoreState { + return { + lastUpdatedAt: 0, + store: createTopicNameStore(), + }; +} + +function getTopicNameCacheState(): TopicNameCacheState { + const globalStore = globalThis as Record; + const existing = globalStore[TOPIC_NAME_CACHE_STATE_KEY] as TopicNameCacheState | undefined; + if (existing) { + return existing; + } + const state: TopicNameCacheState = { stores: new Map() }; + globalStore[TOPIC_NAME_CACHE_STATE_KEY] = state; + return state; +} function cacheKey(chatId: number | string, threadId: number | string): string { return `${chatId}:${threadId}`; } -function evictOldest(): void { - while (cache.size > MAX_ENTRIES) { - const oldestKey = cache.keys().next().value; - if (!oldestKey) { - return; - } - cache.delete(oldestKey); +export function resolveTopicNameCachePath(storePath: string): string { + return `${storePath}.telegram-topic-names.json`; +} + +function evictOldest(store: TopicNameStore): void { + if (store.size <= MAX_ENTRIES) { + return; } + let oldestKey: string | undefined; + let oldestTime = Infinity; + for (const [key, entry] of store) { + if (entry.updatedAt < oldestTime) { + oldestTime = entry.updatedAt; + oldestKey = key; + } + } + if (oldestKey) { + store.delete(oldestKey); + } +} + +function isTopicEntry(value: unknown): value is TopicEntry { + if (!value || typeof value !== "object") { + return false; + } + const entry = value as Partial; + return ( + typeof entry.name === "string" && + entry.name.length > 0 && + typeof entry.updatedAt === "number" && + Number.isFinite(entry.updatedAt) + ); +} + +function readPersistedTopicNames(persistedPath: string): TopicNameStore { + if (!fs.existsSync(persistedPath)) { + return createTopicNameStore(); + } + try { + const raw = fs.readFileSync(persistedPath, "utf-8"); + const parsed = JSON.parse(raw) as Record; + const entries = Object.entries(parsed) + .filter((entry): entry is [string, TopicEntry] => isTopicEntry(entry[1])) + .toSorted(([, left], [, right]) => right.updatedAt - left.updatedAt) + .slice(0, MAX_ENTRIES); + return new Map(entries); + } catch (error) { + logVerbose(`telegram: failed to read topic-name cache: ${String(error)}`); + return createTopicNameStore(); + } +} + +function getTopicStoreState(persistedPath?: string): TopicNameStoreState { + const state = getTopicNameCacheState(); + const stateKey = persistedPath ?? DEFAULT_TOPIC_NAME_CACHE_KEY; + const existing = state.stores.get(stateKey); + if (existing) { + return existing; + } + const next = persistedPath + ? { + lastUpdatedAt: 0, + store: readPersistedTopicNames(persistedPath), + } + : createTopicNameStoreState(); + next.lastUpdatedAt = Math.max(0, ...next.store.values().map((entry) => entry.updatedAt)); + state.stores.set(stateKey, next); + return next; +} + +function getTopicStore(persistedPath?: string): TopicNameStore { + return getTopicStoreState(persistedPath).store; +} + +function nextUpdatedAt(persistedPath?: string): number { + const state = getTopicStoreState(persistedPath); + const now = Date.now(); + state.lastUpdatedAt = now > state.lastUpdatedAt ? now : state.lastUpdatedAt + 1; + return state.lastUpdatedAt; +} + +function removeTopicStore(persistedPath?: string): void { + const state = getTopicNameCacheState(); + const stateKey = persistedPath ?? DEFAULT_TOPIC_NAME_CACHE_KEY; + if (persistedPath) { + fs.rmSync(persistedPath, { force: true }); + } + state.stores.delete(stateKey); +} + +function persistTopicStore(persistedPath: string, store: TopicNameStore): void { + if (store.size === 0) { + fs.rmSync(persistedPath, { force: true }); + return; + } + fs.mkdirSync(path.dirname(persistedPath), { recursive: true }); + const tempPath = `${persistedPath}.${process.pid}.tmp`; + fs.writeFileSync(tempPath, JSON.stringify(Object.fromEntries(store)), "utf-8"); + fs.renameSync(tempPath, persistedPath); } export function updateTopicName( chatId: number | string, threadId: number | string, patch: Partial>, + persistedPath?: string, ): void { + const cache = getTopicStore(persistedPath); const key = cacheKey(chatId, threadId); const existing = cache.get(key); const merged: TopicEntry = { @@ -36,45 +166,53 @@ export function updateTopicName( iconColor: patch.iconColor ?? existing?.iconColor, iconCustomEmojiId: patch.iconCustomEmojiId ?? existing?.iconCustomEmojiId, closed: patch.closed ?? existing?.closed, - updatedAt: Date.now(), + updatedAt: nextUpdatedAt(persistedPath), }; if (!merged.name) { return; } - cache.delete(key); cache.set(key, merged); - evictOldest(); + evictOldest(cache); + if (persistedPath) { + try { + persistTopicStore(persistedPath, cache); + } catch (error) { + logVerbose(`telegram: failed to persist topic-name cache: ${String(error)}`); + } + } } export function getTopicName( chatId: number | string, threadId: number | string, + persistedPath?: string, ): string | undefined { - const key = cacheKey(chatId, threadId); - const entry = cache.get(key); + const entry = getTopicStore(persistedPath).get(cacheKey(chatId, threadId)); if (entry) { - const refreshedEntry: TopicEntry = { - ...entry, - updatedAt: Date.now(), - }; - cache.delete(key); - cache.set(key, refreshedEntry); - return refreshedEntry.name; + entry.updatedAt = nextUpdatedAt(persistedPath); } - return undefined; + return entry?.name; } export function getTopicEntry( chatId: number | string, threadId: number | string, + persistedPath?: string, ): TopicEntry | undefined { - return cache.get(cacheKey(chatId, threadId)); + return getTopicStore(persistedPath).get(cacheKey(chatId, threadId)); } export function clearTopicNameCache(): void { - cache.clear(); + const state = getTopicNameCacheState(); + for (const stateKey of state.stores.keys()) { + removeTopicStore(stateKey === DEFAULT_TOPIC_NAME_CACHE_KEY ? undefined : stateKey); + } } export function topicNameCacheSize(): number { - return cache.size; + return getTopicStore().size; +} + +export function resetTopicNameCacheForTest(): void { + getTopicNameCacheState().stores.clear(); }