mirror of
https://github.com/openclaw/openclaw.git
synced 2026-04-17 14:53:24 +02:00
fix(telegram): persist topic-name cache
This commit is contained in:
@@ -34,7 +34,7 @@ import {
|
||||
resolveTelegramReactionVariant,
|
||||
resolveTelegramStatusReactionEmojis,
|
||||
} from "./status-reaction-variants.js";
|
||||
import { getTopicName, updateTopicName } from "./topic-name-cache.js";
|
||||
import { getTopicName, resolveTopicNameCachePath, updateTopicName } from "./topic-name-cache.js";
|
||||
|
||||
export type {
|
||||
BuildTelegramMessageContextParams,
|
||||
@@ -149,40 +149,51 @@ export const buildTelegramMessageContext = async ({
|
||||
const resolvedThreadId = threadSpec.scope === "forum" ? threadSpec.id : undefined;
|
||||
const replyThreadId = threadSpec.id;
|
||||
const dmThreadId = threadSpec.scope === "dm" ? threadSpec.id : undefined;
|
||||
const topicNameCachePath = resolveTopicNameCachePath(
|
||||
sessionRuntime.resolveStorePath(cfg.session?.store, { agentId: account.accountId }),
|
||||
);
|
||||
let topicName: string | undefined;
|
||||
if (isForum && resolvedThreadId != null) {
|
||||
const ftCreated = msg.forum_topic_created;
|
||||
const ftEdited = msg.forum_topic_edited;
|
||||
const ftClosed = msg.forum_topic_closed;
|
||||
const ftReopened = msg.forum_topic_reopened;
|
||||
const topicPatch = ftCreated?.name
|
||||
? {
|
||||
name: ftCreated.name,
|
||||
iconColor: ftCreated.icon_color,
|
||||
iconCustomEmojiId: ftCreated.icon_custom_emoji_id,
|
||||
closed: false,
|
||||
}
|
||||
: ftEdited?.name
|
||||
? {
|
||||
name: ftEdited.name,
|
||||
iconCustomEmojiId: ftEdited.icon_custom_emoji_id,
|
||||
}
|
||||
: ftClosed
|
||||
? { closed: true }
|
||||
: ftReopened
|
||||
? { closed: false }
|
||||
: undefined;
|
||||
|
||||
if (ftCreated?.name) {
|
||||
updateTopicName(chatId, resolvedThreadId, {
|
||||
name: ftCreated.name,
|
||||
iconColor: ftCreated.icon_color,
|
||||
iconCustomEmojiId: ftCreated.icon_custom_emoji_id,
|
||||
closed: false,
|
||||
});
|
||||
} else if (ftEdited?.name) {
|
||||
updateTopicName(chatId, resolvedThreadId, {
|
||||
name: ftEdited.name,
|
||||
iconCustomEmojiId: ftEdited.icon_custom_emoji_id,
|
||||
});
|
||||
} else if (ftClosed) {
|
||||
updateTopicName(chatId, resolvedThreadId, { closed: true });
|
||||
} else if (ftReopened) {
|
||||
updateTopicName(chatId, resolvedThreadId, { closed: false });
|
||||
if (topicPatch) {
|
||||
updateTopicName(chatId, resolvedThreadId, topicPatch, topicNameCachePath);
|
||||
}
|
||||
|
||||
topicName = getTopicName(chatId, resolvedThreadId);
|
||||
topicName = getTopicName(chatId, resolvedThreadId, topicNameCachePath);
|
||||
if (!topicName) {
|
||||
const replyFtCreated = msg.reply_to_message?.forum_topic_created;
|
||||
if (replyFtCreated?.name) {
|
||||
updateTopicName(chatId, resolvedThreadId, {
|
||||
name: replyFtCreated.name,
|
||||
iconColor: replyFtCreated.icon_color,
|
||||
iconCustomEmojiId: replyFtCreated.icon_custom_emoji_id,
|
||||
});
|
||||
updateTopicName(
|
||||
chatId,
|
||||
resolvedThreadId,
|
||||
{
|
||||
name: replyFtCreated.name,
|
||||
iconColor: replyFtCreated.icon_color,
|
||||
iconCustomEmojiId: replyFtCreated.icon_custom_emoji_id,
|
||||
},
|
||||
topicNameCachePath,
|
||||
);
|
||||
topicName = replyFtCreated.name;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,10 @@
|
||||
import fs from "node:fs";
|
||||
import path from "node:path";
|
||||
import { logVerbose } from "openclaw/plugin-sdk/runtime-env";
|
||||
|
||||
const MAX_ENTRIES = 2_048;
|
||||
const TOPIC_NAME_CACHE_STATE_KEY = Symbol.for("openclaw.telegramTopicNameCacheState");
|
||||
const DEFAULT_TOPIC_NAME_CACHE_KEY = "__default__";
|
||||
|
||||
export type TopicEntry = {
|
||||
name: string;
|
||||
@@ -8,27 +14,151 @@ export type TopicEntry = {
|
||||
updatedAt: number;
|
||||
};
|
||||
|
||||
const cache = new Map<string, TopicEntry>();
|
||||
type TopicNameStore = Map<string, TopicEntry>;
|
||||
|
||||
type TopicNameStoreState = {
|
||||
lastUpdatedAt: number;
|
||||
store: TopicNameStore;
|
||||
};
|
||||
|
||||
type TopicNameCacheState = {
|
||||
stores: Map<string, TopicNameStoreState>;
|
||||
};
|
||||
|
||||
function createTopicNameStore(): TopicNameStore {
|
||||
return new Map<string, TopicEntry>();
|
||||
}
|
||||
|
||||
function createTopicNameStoreState(): TopicNameStoreState {
|
||||
return {
|
||||
lastUpdatedAt: 0,
|
||||
store: createTopicNameStore(),
|
||||
};
|
||||
}
|
||||
|
||||
function getTopicNameCacheState(): TopicNameCacheState {
|
||||
const globalStore = globalThis as Record<PropertyKey, unknown>;
|
||||
const existing = globalStore[TOPIC_NAME_CACHE_STATE_KEY] as TopicNameCacheState | undefined;
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const state: TopicNameCacheState = { stores: new Map() };
|
||||
globalStore[TOPIC_NAME_CACHE_STATE_KEY] = state;
|
||||
return state;
|
||||
}
|
||||
|
||||
function cacheKey(chatId: number | string, threadId: number | string): string {
|
||||
return `${chatId}:${threadId}`;
|
||||
}
|
||||
|
||||
function evictOldest(): void {
|
||||
while (cache.size > MAX_ENTRIES) {
|
||||
const oldestKey = cache.keys().next().value;
|
||||
if (!oldestKey) {
|
||||
return;
|
||||
}
|
||||
cache.delete(oldestKey);
|
||||
export function resolveTopicNameCachePath(storePath: string): string {
|
||||
return `${storePath}.telegram-topic-names.json`;
|
||||
}
|
||||
|
||||
function evictOldest(store: TopicNameStore): void {
|
||||
if (store.size <= MAX_ENTRIES) {
|
||||
return;
|
||||
}
|
||||
let oldestKey: string | undefined;
|
||||
let oldestTime = Infinity;
|
||||
for (const [key, entry] of store) {
|
||||
if (entry.updatedAt < oldestTime) {
|
||||
oldestTime = entry.updatedAt;
|
||||
oldestKey = key;
|
||||
}
|
||||
}
|
||||
if (oldestKey) {
|
||||
store.delete(oldestKey);
|
||||
}
|
||||
}
|
||||
|
||||
function isTopicEntry(value: unknown): value is TopicEntry {
|
||||
if (!value || typeof value !== "object") {
|
||||
return false;
|
||||
}
|
||||
const entry = value as Partial<TopicEntry>;
|
||||
return (
|
||||
typeof entry.name === "string" &&
|
||||
entry.name.length > 0 &&
|
||||
typeof entry.updatedAt === "number" &&
|
||||
Number.isFinite(entry.updatedAt)
|
||||
);
|
||||
}
|
||||
|
||||
function readPersistedTopicNames(persistedPath: string): TopicNameStore {
|
||||
if (!fs.existsSync(persistedPath)) {
|
||||
return createTopicNameStore();
|
||||
}
|
||||
try {
|
||||
const raw = fs.readFileSync(persistedPath, "utf-8");
|
||||
const parsed = JSON.parse(raw) as Record<string, unknown>;
|
||||
const entries = Object.entries(parsed)
|
||||
.filter((entry): entry is [string, TopicEntry] => isTopicEntry(entry[1]))
|
||||
.toSorted(([, left], [, right]) => right.updatedAt - left.updatedAt)
|
||||
.slice(0, MAX_ENTRIES);
|
||||
return new Map(entries);
|
||||
} catch (error) {
|
||||
logVerbose(`telegram: failed to read topic-name cache: ${String(error)}`);
|
||||
return createTopicNameStore();
|
||||
}
|
||||
}
|
||||
|
||||
function getTopicStoreState(persistedPath?: string): TopicNameStoreState {
|
||||
const state = getTopicNameCacheState();
|
||||
const stateKey = persistedPath ?? DEFAULT_TOPIC_NAME_CACHE_KEY;
|
||||
const existing = state.stores.get(stateKey);
|
||||
if (existing) {
|
||||
return existing;
|
||||
}
|
||||
const next = persistedPath
|
||||
? {
|
||||
lastUpdatedAt: 0,
|
||||
store: readPersistedTopicNames(persistedPath),
|
||||
}
|
||||
: createTopicNameStoreState();
|
||||
next.lastUpdatedAt = Math.max(0, ...next.store.values().map((entry) => entry.updatedAt));
|
||||
state.stores.set(stateKey, next);
|
||||
return next;
|
||||
}
|
||||
|
||||
function getTopicStore(persistedPath?: string): TopicNameStore {
|
||||
return getTopicStoreState(persistedPath).store;
|
||||
}
|
||||
|
||||
function nextUpdatedAt(persistedPath?: string): number {
|
||||
const state = getTopicStoreState(persistedPath);
|
||||
const now = Date.now();
|
||||
state.lastUpdatedAt = now > state.lastUpdatedAt ? now : state.lastUpdatedAt + 1;
|
||||
return state.lastUpdatedAt;
|
||||
}
|
||||
|
||||
function removeTopicStore(persistedPath?: string): void {
|
||||
const state = getTopicNameCacheState();
|
||||
const stateKey = persistedPath ?? DEFAULT_TOPIC_NAME_CACHE_KEY;
|
||||
if (persistedPath) {
|
||||
fs.rmSync(persistedPath, { force: true });
|
||||
}
|
||||
state.stores.delete(stateKey);
|
||||
}
|
||||
|
||||
function persistTopicStore(persistedPath: string, store: TopicNameStore): void {
|
||||
if (store.size === 0) {
|
||||
fs.rmSync(persistedPath, { force: true });
|
||||
return;
|
||||
}
|
||||
fs.mkdirSync(path.dirname(persistedPath), { recursive: true });
|
||||
const tempPath = `${persistedPath}.${process.pid}.tmp`;
|
||||
fs.writeFileSync(tempPath, JSON.stringify(Object.fromEntries(store)), "utf-8");
|
||||
fs.renameSync(tempPath, persistedPath);
|
||||
}
|
||||
|
||||
export function updateTopicName(
|
||||
chatId: number | string,
|
||||
threadId: number | string,
|
||||
patch: Partial<Omit<TopicEntry, "updatedAt">>,
|
||||
persistedPath?: string,
|
||||
): void {
|
||||
const cache = getTopicStore(persistedPath);
|
||||
const key = cacheKey(chatId, threadId);
|
||||
const existing = cache.get(key);
|
||||
const merged: TopicEntry = {
|
||||
@@ -36,45 +166,53 @@ export function updateTopicName(
|
||||
iconColor: patch.iconColor ?? existing?.iconColor,
|
||||
iconCustomEmojiId: patch.iconCustomEmojiId ?? existing?.iconCustomEmojiId,
|
||||
closed: patch.closed ?? existing?.closed,
|
||||
updatedAt: Date.now(),
|
||||
updatedAt: nextUpdatedAt(persistedPath),
|
||||
};
|
||||
if (!merged.name) {
|
||||
return;
|
||||
}
|
||||
cache.delete(key);
|
||||
cache.set(key, merged);
|
||||
evictOldest();
|
||||
evictOldest(cache);
|
||||
if (persistedPath) {
|
||||
try {
|
||||
persistTopicStore(persistedPath, cache);
|
||||
} catch (error) {
|
||||
logVerbose(`telegram: failed to persist topic-name cache: ${String(error)}`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export function getTopicName(
|
||||
chatId: number | string,
|
||||
threadId: number | string,
|
||||
persistedPath?: string,
|
||||
): string | undefined {
|
||||
const key = cacheKey(chatId, threadId);
|
||||
const entry = cache.get(key);
|
||||
const entry = getTopicStore(persistedPath).get(cacheKey(chatId, threadId));
|
||||
if (entry) {
|
||||
const refreshedEntry: TopicEntry = {
|
||||
...entry,
|
||||
updatedAt: Date.now(),
|
||||
};
|
||||
cache.delete(key);
|
||||
cache.set(key, refreshedEntry);
|
||||
return refreshedEntry.name;
|
||||
entry.updatedAt = nextUpdatedAt(persistedPath);
|
||||
}
|
||||
return undefined;
|
||||
return entry?.name;
|
||||
}
|
||||
|
||||
export function getTopicEntry(
|
||||
chatId: number | string,
|
||||
threadId: number | string,
|
||||
persistedPath?: string,
|
||||
): TopicEntry | undefined {
|
||||
return cache.get(cacheKey(chatId, threadId));
|
||||
return getTopicStore(persistedPath).get(cacheKey(chatId, threadId));
|
||||
}
|
||||
|
||||
export function clearTopicNameCache(): void {
|
||||
cache.clear();
|
||||
const state = getTopicNameCacheState();
|
||||
for (const stateKey of state.stores.keys()) {
|
||||
removeTopicStore(stateKey === DEFAULT_TOPIC_NAME_CACHE_KEY ? undefined : stateKey);
|
||||
}
|
||||
}
|
||||
|
||||
export function topicNameCacheSize(): number {
|
||||
return cache.size;
|
||||
return getTopicStore().size;
|
||||
}
|
||||
|
||||
export function resetTopicNameCacheForTest(): void {
|
||||
getTopicNameCacheState().stores.clear();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user