diff --git a/src/agents/pi-embedded-runner/compact.ts b/src/agents/pi-embedded-runner/compact.ts index 88b7dea4d8a..0aabdadaf5f 100644 --- a/src/agents/pi-embedded-runner/compact.ts +++ b/src/agents/pi-embedded-runner/compact.ts @@ -15,7 +15,6 @@ import { ensureContextEnginesInitialized, resolveContextEngine, } from "../../context-engine/index.js"; -import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; import { getMachineDisplayName } from "../../infra/machine-name.js"; import { generateSecureToken } from "../../infra/secure-random.js"; import { resolveSignalReactionLevel } from "../../plugin-sdk/signal.js"; @@ -24,17 +23,15 @@ import { resolveTelegramReactionLevel, } from "../../plugin-sdk/telegram.js"; import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; -import { getActiveMemorySearchManager } from "../../plugins/memory-runtime.js"; import { prepareProviderRuntimeAuth } from "../../plugins/provider-runtime.js"; import { type enqueueCommand, enqueueCommandInLane } from "../../process/command-queue.js"; import { isCronSessionKey, isSubagentSessionKey } from "../../routing/session-key.js"; -import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; import { buildTtsSystemPromptHint } from "../../tts/tts.js"; import { resolveUserPath } from "../../utils.js"; import { normalizeMessageChannel } from "../../utils/message-channel.js"; import { isReasoningTagProvider } from "../../utils/provider-utils.js"; import { resolveOpenClawAgentDir } from "../agent-paths.js"; -import { resolveSessionAgentId, resolveSessionAgentIds } from "../agent-scope.js"; +import { resolveSessionAgentIds } from "../agent-scope.js"; import type { ExecElevatedDefaults } from "../bash-tools.js"; import { makeBootstrapWarn, resolveBootstrapContextForRun } from "../bootstrap-files.js"; import { listChannelSupportedActions, resolveChannelMessageToolHints } from "../channel-tools.js"; @@ -47,7 +44,6 @@ import { ensureCustomApiRegistered } from "../custom-api-registry.js"; import { formatUserTime, resolveUserTimeFormat, resolveUserTimezone } from "../date-time.js"; import { DEFAULT_CONTEXT_TOKENS, DEFAULT_MODEL, DEFAULT_PROVIDER } from "../defaults.js"; import { resolveOpenClawDocsPath } from "../docs-path.js"; -import { resolveMemorySearchConfig } from "../memory-search.js"; import { applyLocalNoAuthHeaderOverride, getApiKeyForModel, @@ -88,6 +84,14 @@ import { } from "../skills.js"; import { resolveTranscriptPolicy } from "../transcript-policy.js"; import { classifyCompactionReason, resolveCompactionFailureReason } from "./compact-reasons.js"; +import { + asCompactionHookRunner, + buildBeforeCompactionHookMetrics, + estimateTokensAfterCompaction, + runAfterCompactionHooks, + runBeforeCompactionHooks, + runPostCompactionSideEffects, +} from "./compaction-hooks.js"; import { compactWithSafetyTimeout, resolveCompactionTimeoutMs, @@ -258,311 +262,12 @@ function summarizeCompactionMessages(messages: AgentMessage[]): CompactionMessag }; } -function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" { - const mode = config?.agents?.defaults?.compaction?.postIndexSync; - if (mode === "off" || mode === "async" || mode === "await") { - return mode; - } - return "async"; -} - -async function runPostCompactionSessionMemorySync(params: { - config?: OpenClawConfig; - sessionKey?: string; - sessionFile: string; -}): Promise { - if (!params.config) { - return; - } - try { - const sessionFile = params.sessionFile.trim(); - if (!sessionFile) { - return; - } - const agentId = resolveSessionAgentId({ - sessionKey: params.sessionKey, - config: params.config, - }); - const resolvedMemory = resolveMemorySearchConfig(params.config, agentId); - if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) { - return; - } - if (!resolvedMemory.sync.sessions.postCompactionForce) { - return; - } - const { manager } = await getActiveMemorySearchManager({ - cfg: params.config, - agentId, - }); - if (!manager?.sync) { - return; - } - const syncTask = manager.sync({ - reason: "post-compaction", - sessionFiles: [sessionFile], - }); - await syncTask; - } catch (err) { - log.warn(`memory sync skipped (post-compaction): ${String(err)}`); - } -} - -function syncPostCompactionSessionMemory(params: { - config?: OpenClawConfig; - sessionKey?: string; - sessionFile: string; - mode: "off" | "async" | "await"; -}): Promise { - if (params.mode === "off" || !params.config) { - return Promise.resolve(); - } - - const syncTask = runPostCompactionSessionMemorySync({ - config: params.config, - sessionKey: params.sessionKey, - sessionFile: params.sessionFile, - }); - if (params.mode === "await") { - return syncTask; - } - void syncTask; - return Promise.resolve(); -} - -export async function runPostCompactionSideEffects(params: { - config?: OpenClawConfig; - sessionKey?: string; - sessionFile: string; -}): Promise { - const sessionFile = params.sessionFile.trim(); - if (!sessionFile) { - return; - } - emitSessionTranscriptUpdate(sessionFile); - await syncPostCompactionSessionMemory({ - config: params.config, - sessionKey: params.sessionKey, - sessionFile, - mode: resolvePostCompactionIndexSyncMode(params.config), - }); -} - -type CompactionHookRunner = { - hasHooks?: (hookName?: string) => boolean; - runBeforeCompaction?: ( - metrics: { messageCount: number; tokenCount?: number; sessionFile?: string }, - context: { - sessionId: string; - agentId: string; - sessionKey: string; - workspaceDir: string; - messageProvider?: string; - }, - ) => Promise | void; - runAfterCompaction?: ( - metrics: { - messageCount: number; - tokenCount?: number; - compactedCount: number; - sessionFile: string; - }, - context: { - sessionId: string; - agentId: string; - sessionKey: string; - workspaceDir: string; - messageProvider?: string; - }, - ) => Promise | void; -}; - -function asCompactionHookRunner( - hookRunner: ReturnType | null | undefined, -): CompactionHookRunner | null { - if (!hookRunner) { - return null; - } - return { - hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false, - runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner), - runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner), - }; -} - -function estimateTokenCountSafe( - messages: AgentMessage[], - estimateTokensFn: (message: AgentMessage) => number, -): number | undefined { - try { - let total = 0; - for (const message of messages) { - total += estimateTokensFn(message); - } - return total; - } catch { - return undefined; - } -} - -function buildBeforeCompactionHookMetrics(params: { - originalMessages: AgentMessage[]; - currentMessages: AgentMessage[]; - observedTokenCount?: number; - estimateTokensFn: (message: AgentMessage) => number; -}) { - return { - messageCountOriginal: params.originalMessages.length, - tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn), - messageCountBefore: params.currentMessages.length, - tokenCountBefore: - params.observedTokenCount ?? - estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn), - }; -} - -async function runBeforeCompactionHooks(params: { - hookRunner?: CompactionHookRunner | null; - sessionId: string; - sessionKey?: string; - sessionAgentId: string; - workspaceDir: string; - messageProvider?: string; - metrics: ReturnType; -}) { - const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); - const hookSessionKey = params.sessionKey?.trim() || params.sessionId; - try { - const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { - sessionId: params.sessionId, - missingSessionKey, - messageCount: params.metrics.messageCountBefore, - tokenCount: params.metrics.tokenCountBefore, - messageCountOriginal: params.metrics.messageCountOriginal, - tokenCountOriginal: params.metrics.tokenCountOriginal, - }); - await triggerInternalHook(hookEvent); - } catch (err) { - log.warn("session:compact:before hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - if (params.hookRunner?.hasHooks?.("before_compaction")) { - try { - await params.hookRunner.runBeforeCompaction?.( - { - messageCount: params.metrics.messageCountBefore, - tokenCount: params.metrics.tokenCountBefore, - }, - { - sessionId: params.sessionId, - agentId: params.sessionAgentId, - sessionKey: hookSessionKey, - workspaceDir: params.workspaceDir, - messageProvider: params.messageProvider, - }, - ); - } catch (err) { - log.warn("before_compaction hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - } - return { - hookSessionKey, - missingSessionKey, - }; -} - function containsRealConversationMessages(messages: AgentMessage[]): boolean { return messages.some((message, index, allMessages) => hasRealConversationContent(message, allMessages, index), ); } -function estimateTokensAfterCompaction(params: { - messagesAfter: AgentMessage[]; - observedTokenCount?: number; - fullSessionTokensBefore: number; - estimateTokensFn: (message: AgentMessage) => number; -}) { - const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn); - if (tokensAfter === undefined) { - return undefined; - } - const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore; - if ( - sanityCheckBaseline > 0 && - tokensAfter > - (params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1) - ) { - return undefined; - } - return tokensAfter; -} - -async function runAfterCompactionHooks(params: { - hookRunner?: CompactionHookRunner | null; - sessionId: string; - sessionAgentId: string; - hookSessionKey: string; - missingSessionKey: boolean; - workspaceDir: string; - messageProvider?: string; - messageCountAfter: number; - tokensAfter?: number; - compactedCount: number; - sessionFile: string; - summaryLength?: number; - tokensBefore?: number; - firstKeptEntryId?: string; -}) { - try { - const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, { - sessionId: params.sessionId, - missingSessionKey: params.missingSessionKey, - messageCount: params.messageCountAfter, - tokenCount: params.tokensAfter, - compactedCount: params.compactedCount, - summaryLength: params.summaryLength, - tokensBefore: params.tokensBefore, - tokensAfter: params.tokensAfter, - firstKeptEntryId: params.firstKeptEntryId, - }); - await triggerInternalHook(hookEvent); - } catch (err) { - log.warn("session:compact:after hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - if (params.hookRunner?.hasHooks?.("after_compaction")) { - try { - await params.hookRunner.runAfterCompaction?.( - { - messageCount: params.messageCountAfter, - tokenCount: params.tokensAfter, - compactedCount: params.compactedCount, - sessionFile: params.sessionFile, - }, - { - sessionId: params.sessionId, - agentId: params.sessionAgentId, - sessionKey: params.hookSessionKey, - workspaceDir: params.workspaceDir, - messageProvider: params.messageProvider, - }, - ); - } catch (err) { - log.warn("after_compaction hook failed", { - errorMessage: err instanceof Error ? err.message : String(err), - errorStack: err instanceof Error ? err.stack : undefined, - }); - } - } -} - /** * Core compaction logic without lane queueing. * Use this when already inside a session/global lane to avoid deadlocks. @@ -1399,3 +1104,5 @@ export const __testing = { runAfterCompactionHooks, runPostCompactionSideEffects, } as const; + +export { runPostCompactionSideEffects } from "./compaction-hooks.js"; diff --git a/src/agents/pi-embedded-runner/compaction-hooks.ts b/src/agents/pi-embedded-runner/compaction-hooks.ts new file mode 100644 index 00000000000..7ee674e377b --- /dev/null +++ b/src/agents/pi-embedded-runner/compaction-hooks.ts @@ -0,0 +1,307 @@ +import type { AgentMessage } from "@mariozechner/pi-agent-core"; +import type { OpenClawConfig } from "../../config/config.js"; +import { createInternalHookEvent, triggerInternalHook } from "../../hooks/internal-hooks.js"; +import { getGlobalHookRunner } from "../../plugins/hook-runner-global.js"; +import { getActiveMemorySearchManager } from "../../plugins/memory-runtime.js"; +import { emitSessionTranscriptUpdate } from "../../sessions/transcript-events.js"; +import { resolveSessionAgentId } from "../agent-scope.js"; +import { resolveMemorySearchConfig } from "../memory-search.js"; +import { log } from "./logger.js"; + +function resolvePostCompactionIndexSyncMode(config?: OpenClawConfig): "off" | "async" | "await" { + const mode = config?.agents?.defaults?.compaction?.postIndexSync; + if (mode === "off" || mode === "async" || mode === "await") { + return mode; + } + return "async"; +} + +async function runPostCompactionSessionMemorySync(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; +}): Promise { + if (!params.config) { + return; + } + try { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } + const agentId = resolveSessionAgentId({ + sessionKey: params.sessionKey, + config: params.config, + }); + const resolvedMemory = resolveMemorySearchConfig(params.config, agentId); + if (!resolvedMemory || !resolvedMemory.sources.includes("sessions")) { + return; + } + if (!resolvedMemory.sync.sessions.postCompactionForce) { + return; + } + const { manager } = await getActiveMemorySearchManager({ + cfg: params.config, + agentId, + }); + if (!manager?.sync) { + return; + } + await manager.sync({ + reason: "post-compaction", + sessionFiles: [sessionFile], + }); + } catch (err) { + log.warn(`memory sync skipped (post-compaction): ${String(err)}`); + } +} + +function syncPostCompactionSessionMemory(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; + mode: "off" | "async" | "await"; +}): Promise { + if (params.mode === "off" || !params.config) { + return Promise.resolve(); + } + + const syncTask = runPostCompactionSessionMemorySync({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile: params.sessionFile, + }); + if (params.mode === "await") { + return syncTask; + } + void syncTask; + return Promise.resolve(); +} + +export async function runPostCompactionSideEffects(params: { + config?: OpenClawConfig; + sessionKey?: string; + sessionFile: string; +}): Promise { + const sessionFile = params.sessionFile.trim(); + if (!sessionFile) { + return; + } + emitSessionTranscriptUpdate(sessionFile); + await syncPostCompactionSessionMemory({ + config: params.config, + sessionKey: params.sessionKey, + sessionFile, + mode: resolvePostCompactionIndexSyncMode(params.config), + }); +} + +export type CompactionHookRunner = { + hasHooks?: (hookName?: string) => boolean; + runBeforeCompaction?: ( + metrics: { messageCount: number; tokenCount?: number; sessionFile?: string }, + context: { + sessionId: string; + agentId: string; + sessionKey: string; + workspaceDir: string; + messageProvider?: string; + }, + ) => Promise | void; + runAfterCompaction?: ( + metrics: { + messageCount: number; + tokenCount?: number; + compactedCount: number; + sessionFile: string; + }, + context: { + sessionId: string; + agentId: string; + sessionKey: string; + workspaceDir: string; + messageProvider?: string; + }, + ) => Promise | void; +}; + +export function asCompactionHookRunner( + hookRunner: ReturnType | null | undefined, +): CompactionHookRunner | null { + if (!hookRunner) { + return null; + } + return { + hasHooks: (hookName?: string) => hookRunner.hasHooks?.(hookName as never) ?? false, + runBeforeCompaction: hookRunner.runBeforeCompaction?.bind(hookRunner), + runAfterCompaction: hookRunner.runAfterCompaction?.bind(hookRunner), + }; +} + +function estimateTokenCountSafe( + messages: AgentMessage[], + estimateTokensFn: (message: AgentMessage) => number, +): number | undefined { + try { + let total = 0; + for (const message of messages) { + total += estimateTokensFn(message); + } + return total; + } catch { + return undefined; + } +} + +export function buildBeforeCompactionHookMetrics(params: { + originalMessages: AgentMessage[]; + currentMessages: AgentMessage[]; + observedTokenCount?: number; + estimateTokensFn: (message: AgentMessage) => number; +}) { + return { + messageCountOriginal: params.originalMessages.length, + tokenCountOriginal: estimateTokenCountSafe(params.originalMessages, params.estimateTokensFn), + messageCountBefore: params.currentMessages.length, + tokenCountBefore: + params.observedTokenCount ?? + estimateTokenCountSafe(params.currentMessages, params.estimateTokensFn), + }; +} + +export async function runBeforeCompactionHooks(params: { + hookRunner?: CompactionHookRunner | null; + sessionId: string; + sessionKey?: string; + sessionAgentId: string; + workspaceDir: string; + messageProvider?: string; + metrics: ReturnType; +}) { + const missingSessionKey = !params.sessionKey || !params.sessionKey.trim(); + const hookSessionKey = params.sessionKey?.trim() || params.sessionId; + try { + const hookEvent = createInternalHookEvent("session", "compact:before", hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey, + messageCount: params.metrics.messageCountBefore, + tokenCount: params.metrics.tokenCountBefore, + messageCountOriginal: params.metrics.messageCountOriginal, + tokenCountOriginal: params.metrics.tokenCountOriginal, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:before hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (params.hookRunner?.hasHooks?.("before_compaction")) { + try { + await params.hookRunner.runBeforeCompaction?.( + { + messageCount: params.metrics.messageCountBefore, + tokenCount: params.metrics.tokenCountBefore, + }, + { + sessionId: params.sessionId, + agentId: params.sessionAgentId, + sessionKey: hookSessionKey, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider, + }, + ); + } catch (err) { + log.warn("before_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } + return { + hookSessionKey, + missingSessionKey, + }; +} + +export function estimateTokensAfterCompaction(params: { + messagesAfter: AgentMessage[]; + observedTokenCount?: number; + fullSessionTokensBefore: number; + estimateTokensFn: (message: AgentMessage) => number; +}) { + const tokensAfter = estimateTokenCountSafe(params.messagesAfter, params.estimateTokensFn); + if (tokensAfter === undefined) { + return undefined; + } + const sanityCheckBaseline = params.observedTokenCount ?? params.fullSessionTokensBefore; + if ( + sanityCheckBaseline > 0 && + tokensAfter > + (params.observedTokenCount !== undefined ? sanityCheckBaseline : sanityCheckBaseline * 1.1) + ) { + return undefined; + } + return tokensAfter; +} + +export async function runAfterCompactionHooks(params: { + hookRunner?: CompactionHookRunner | null; + sessionId: string; + sessionAgentId: string; + hookSessionKey: string; + missingSessionKey: boolean; + workspaceDir: string; + messageProvider?: string; + messageCountAfter: number; + tokensAfter?: number; + compactedCount: number; + sessionFile: string; + summaryLength?: number; + tokensBefore?: number; + firstKeptEntryId?: string; +}) { + try { + const hookEvent = createInternalHookEvent("session", "compact:after", params.hookSessionKey, { + sessionId: params.sessionId, + missingSessionKey: params.missingSessionKey, + messageCount: params.messageCountAfter, + tokenCount: params.tokensAfter, + compactedCount: params.compactedCount, + summaryLength: params.summaryLength, + tokensBefore: params.tokensBefore, + tokensAfter: params.tokensAfter, + firstKeptEntryId: params.firstKeptEntryId, + }); + await triggerInternalHook(hookEvent); + } catch (err) { + log.warn("session:compact:after hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + if (params.hookRunner?.hasHooks?.("after_compaction")) { + try { + await params.hookRunner.runAfterCompaction?.( + { + messageCount: params.messageCountAfter, + tokenCount: params.tokensAfter, + compactedCount: params.compactedCount, + sessionFile: params.sessionFile, + }, + { + sessionId: params.sessionId, + agentId: params.sessionAgentId, + sessionKey: params.hookSessionKey, + workspaceDir: params.workspaceDir, + messageProvider: params.messageProvider, + }, + ); + } catch (err) { + log.warn("after_compaction hook failed", { + errorMessage: err instanceof Error ? err.message : String(err), + errorStack: err instanceof Error ? err.stack : undefined, + }); + } + } +}