diff --git a/CHANGELOG.md b/CHANGELOG.md index f4f9c07d20f..0fb2632fc49 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai - Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky. - Outbound/delivery-queue: persist the originating outbound `session` context on queued delivery entries and replay it during recovery, so write-ahead-queued sends keep their original outbound media policy context after restart instead of evaluating against a missing session. (#66025) Thanks @eleqtrizit. - Auto-reply/queue: split collect-mode followup drains into contiguous groups by per-message authorization context (sender id, owner status, exec/bash-elevated overrides), so queued items from different senders or exec configs no longer execute under the last queued run's owner-only and exec-approval context. (#66024) Thanks @eleqtrizit. +- Dreaming/memory-core: require a live queued Dreaming cron event before the heartbeat hook runs the sweep, so managed Dreaming no longer replays on later heartbeats after the scheduled run was already consumed. (#66139) Thanks @mbelinky. ## 2026.4.12 diff --git a/extensions/memory-core/src/dreaming.test.ts b/extensions/memory-core/src/dreaming.test.ts index 95784619faa..dd38bc10f1e 100644 --- a/extensions/memory-core/src/dreaming.test.ts +++ b/extensions/memory-core/src/dreaming.test.ts @@ -1,7 +1,8 @@ import fs from "node:fs/promises"; import path from "node:path"; +import { enqueueSystemEvent, resetSystemEventsForTest } from "openclaw/plugin-sdk/infra-runtime"; import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core"; -import { describe, expect, it, vi } from "vitest"; +import { afterEach, describe, expect, it, vi } from "vitest"; import { clearInternalHooks, createInternalHookEvent, @@ -21,6 +22,10 @@ import { createMemoryCoreTestHarness } from "./test-helpers.js"; const constants = __testing.constants; const { createTempWorkspace } = createMemoryCoreTestHarness(); +afterEach(() => { + resetSystemEventsForTest(); +}); + type CronParam = NonNullable[0]["cron"]>; type CronJobLike = Awaited>[number]; type CronAddInput = Parameters[0]; @@ -138,7 +143,7 @@ function getBeforeAgentReplyHandler( onMock: ReturnType, ): ( event: { cleanedBody: string }, - ctx: { trigger?: string; workspaceDir?: string }, + ctx: { trigger?: string; workspaceDir?: string; sessionKey?: string }, ) => Promise { const call = onMock.mock.calls.find(([eventName]) => eventName === "before_agent_reply"); if (!call) { @@ -146,7 +151,7 @@ function getBeforeAgentReplyHandler( } return call[1] as ( event: { cleanedBody: string }, - ctx: { trigger?: string; workspaceDir?: string }, + ctx: { trigger?: string; workspaceDir?: string; sessionKey?: string }, ) => Promise; } @@ -1111,6 +1116,130 @@ describe("gateway startup reconciliation", () => { clearInternalHooks(); } }); + + it("only triggers managed dreaming when the queued cron event is still pending", async () => { + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness(); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: false, + }, + }, + }, + }, + }, + } as OpenClawConfig, + pluginConfig: {}, + logger, + runtime: {}, + registerHook: (event: string, handler: Parameters[1]) => { + registerInternalHook(event, handler); + }, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + await triggerInternalHook( + createInternalHookEvent("gateway", "startup", "gateway:startup", { + cfg: api.config, + deps: { cron: harness.cron }, + }), + ); + + const sessionKey = "agent:main:main"; + enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, { + sessionKey, + contextKey: "cron:memory-dreaming", + }); + + const beforeAgentReply = getBeforeAgentReplyHandler(onMock); + const first = await beforeAgentReply( + { cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT }, + { trigger: "heartbeat", workspaceDir: ".", sessionKey }, + ); + + expect(first).toEqual({ + handled: true, + reason: "memory-core: short-term dreaming disabled", + }); + + resetSystemEventsForTest(); + + const second = await beforeAgentReply( + { cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT }, + { trigger: "heartbeat", workspaceDir: ".", sessionKey }, + ); + + expect(second).toBeUndefined(); + } finally { + clearInternalHooks(); + } + }); + + it("resolves queued managed dreaming cron events from the base session for isolated heartbeats", async () => { + clearInternalHooks(); + const logger = createLogger(); + const harness = createCronHarness(); + const onMock = vi.fn(); + const api: DreamingPluginApiTestDouble = { + config: { + plugins: { + entries: { + "memory-core": { + config: { + dreaming: { + enabled: false, + }, + }, + }, + }, + }, + } as OpenClawConfig, + pluginConfig: {}, + logger, + runtime: {}, + registerHook: (event: string, handler: Parameters[1]) => { + registerInternalHook(event, handler); + }, + on: onMock, + }; + + try { + registerShortTermPromotionDreamingForTest(api); + await triggerInternalHook( + createInternalHookEvent("gateway", "startup", "gateway:startup", { + cfg: api.config, + deps: { cron: harness.cron }, + }), + ); + + enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, { + sessionKey: "agent:main:main", + contextKey: "cron:memory-dreaming", + }); + + const beforeAgentReply = getBeforeAgentReplyHandler(onMock); + const result = await beforeAgentReply( + { cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT }, + { trigger: "heartbeat", workspaceDir: ".", sessionKey: "agent:main:main:heartbeat" }, + ); + + expect(result).toEqual({ + handled: true, + reason: "memory-core: short-term dreaming disabled", + }); + } finally { + clearInternalHooks(); + } + }); }); describe("short-term dreaming trigger", () => { diff --git a/extensions/memory-core/src/dreaming.ts b/extensions/memory-core/src/dreaming.ts index 09ce55438af..739b7f8cca5 100644 --- a/extensions/memory-core/src/dreaming.ts +++ b/extensions/memory-core/src/dreaming.ts @@ -1,3 +1,4 @@ +import { peekSystemEventEntries } from "openclaw/plugin-sdk/infra-runtime"; import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core"; import { DEFAULT_MEMORY_DREAMING_FREQUENCY as DEFAULT_MEMORY_DREAMING_CRON_EXPR, @@ -36,6 +37,7 @@ const LEGACY_REM_SLEEP_CRON_NAME = "Memory REM Dreaming"; const LEGACY_REM_SLEEP_CRON_TAG = "[managed-by=memory-core.dreaming.rem]"; const LEGACY_REM_SLEEP_EVENT_TEXT = "__openclaw_memory_core_rem_sleep__"; const RUNTIME_CRON_RECONCILE_INTERVAL_MS = 60_000; +const HEARTBEAT_ISOLATED_SESSION_SUFFIX = ":heartbeat"; type Logger = Pick; @@ -344,6 +346,35 @@ function resolveStartupConfigFromEvent(event: unknown, fallback: OpenClawConfig) return startupCfg as OpenClawConfig; } +function resolveDreamingTriggerSessionKeys(sessionKey?: string): string[] { + const normalized = normalizeTrimmedString(sessionKey); + if (!normalized) { + return []; + } + + const keys = [normalized]; + // Isolated heartbeat runs execute in a sibling `:heartbeat` session while cron + // system events stay queued on the base main session. + if (normalized.endsWith(HEARTBEAT_ISOLATED_SESSION_SUFFIX)) { + const baseSessionKey = normalized.slice(0, -HEARTBEAT_ISOLATED_SESSION_SUFFIX.length).trim(); + if (baseSessionKey) { + keys.push(baseSessionKey); + } + } + + return Array.from(new Set(keys)); +} + +function hasPendingManagedDreamingCronEvent(sessionKey?: string): boolean { + return resolveDreamingTriggerSessionKeys(sessionKey).some((candidateSessionKey) => + peekSystemEventEntries(candidateSessionKey).some( + (event) => + event.contextKey?.startsWith("cron:") === true && + normalizeTrimmedString(event.text) === DREAMING_SYSTEM_EVENT_TEXT, + ), + ); +} + export function resolveShortTermPromotionDreamingConfig(params: { pluginConfig?: Record; cfg?: OpenClawConfig; @@ -716,6 +747,12 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void const config = await reconcileManagedDreamingCron({ reason: "runtime", }); + if ( + !hasPendingManagedDreamingCronEvent(ctx.sessionKey) || + !includesSystemEventToken(event.cleanedBody, DREAMING_SYSTEM_EVENT_TEXT) + ) { + return undefined; + } return await runShortTermDreamingPromotionIfTriggered({ cleanedBody: event.cleanedBody, trigger: ctx.trigger,