fix(memory-core): run Dreaming once per cron schedule (#66139)

Merged via squash.

Prepared head SHA: 48229a24cb
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Co-authored-by: mbelinky <132747814+mbelinky@users.noreply.github.com>
Reviewed-by: @mbelinky
This commit is contained in:
Mariano
2026-04-13 21:50:32 +02:00
committed by GitHub
parent 99237c2dde
commit 3d42e33dd0
3 changed files with 170 additions and 3 deletions

View File

@@ -28,6 +28,7 @@ Docs: https://docs.openclaw.ai
- Cron/scheduler: preserve the active error-backoff floor when maintenance repair recomputes a missing cron next-run, so recurring errored jobs do not resume early after a transient next-run resolution failure. (#66019, #66083, #66113) Thanks @mbelinky.
- Outbound/delivery-queue: persist the originating outbound `session` context on queued delivery entries and replay it during recovery, so write-ahead-queued sends keep their original outbound media policy context after restart instead of evaluating against a missing session. (#66025) Thanks @eleqtrizit.
- Auto-reply/queue: split collect-mode followup drains into contiguous groups by per-message authorization context (sender id, owner status, exec/bash-elevated overrides), so queued items from different senders or exec configs no longer execute under the last queued run's owner-only and exec-approval context. (#66024) Thanks @eleqtrizit.
- Dreaming/memory-core: require a live queued Dreaming cron event before the heartbeat hook runs the sweep, so managed Dreaming no longer replays on later heartbeats after the scheduled run was already consumed. (#66139) Thanks @mbelinky.
## 2026.4.12

View File

@@ -1,7 +1,8 @@
import fs from "node:fs/promises";
import path from "node:path";
import { enqueueSystemEvent, resetSystemEventsForTest } from "openclaw/plugin-sdk/infra-runtime";
import type { OpenClawConfig } from "openclaw/plugin-sdk/memory-core";
import { describe, expect, it, vi } from "vitest";
import { afterEach, describe, expect, it, vi } from "vitest";
import {
clearInternalHooks,
createInternalHookEvent,
@@ -21,6 +22,10 @@ import { createMemoryCoreTestHarness } from "./test-helpers.js";
const constants = __testing.constants;
const { createTempWorkspace } = createMemoryCoreTestHarness();
afterEach(() => {
resetSystemEventsForTest();
});
type CronParam = NonNullable<Parameters<typeof reconcileShortTermDreamingCronJob>[0]["cron"]>;
type CronJobLike = Awaited<ReturnType<CronParam["list"]>>[number];
type CronAddInput = Parameters<CronParam["add"]>[0];
@@ -138,7 +143,7 @@ function getBeforeAgentReplyHandler(
onMock: ReturnType<typeof vi.fn>,
): (
event: { cleanedBody: string },
ctx: { trigger?: string; workspaceDir?: string },
ctx: { trigger?: string; workspaceDir?: string; sessionKey?: string },
) => Promise<unknown> {
const call = onMock.mock.calls.find(([eventName]) => eventName === "before_agent_reply");
if (!call) {
@@ -146,7 +151,7 @@ function getBeforeAgentReplyHandler(
}
return call[1] as (
event: { cleanedBody: string },
ctx: { trigger?: string; workspaceDir?: string },
ctx: { trigger?: string; workspaceDir?: string; sessionKey?: string },
) => Promise<unknown>;
}
@@ -1111,6 +1116,130 @@ describe("gateway startup reconciliation", () => {
clearInternalHooks();
}
});
it("only triggers managed dreaming when the queued cron event is still pending", async () => {
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness();
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: false,
},
},
},
},
},
} as OpenClawConfig,
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
const sessionKey = "agent:main:main";
enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, {
sessionKey,
contextKey: "cron:memory-dreaming",
});
const beforeAgentReply = getBeforeAgentReplyHandler(onMock);
const first = await beforeAgentReply(
{ cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT },
{ trigger: "heartbeat", workspaceDir: ".", sessionKey },
);
expect(first).toEqual({
handled: true,
reason: "memory-core: short-term dreaming disabled",
});
resetSystemEventsForTest();
const second = await beforeAgentReply(
{ cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT },
{ trigger: "heartbeat", workspaceDir: ".", sessionKey },
);
expect(second).toBeUndefined();
} finally {
clearInternalHooks();
}
});
it("resolves queued managed dreaming cron events from the base session for isolated heartbeats", async () => {
clearInternalHooks();
const logger = createLogger();
const harness = createCronHarness();
const onMock = vi.fn();
const api: DreamingPluginApiTestDouble = {
config: {
plugins: {
entries: {
"memory-core": {
config: {
dreaming: {
enabled: false,
},
},
},
},
},
} as OpenClawConfig,
pluginConfig: {},
logger,
runtime: {},
registerHook: (event: string, handler: Parameters<typeof registerInternalHook>[1]) => {
registerInternalHook(event, handler);
},
on: onMock,
};
try {
registerShortTermPromotionDreamingForTest(api);
await triggerInternalHook(
createInternalHookEvent("gateway", "startup", "gateway:startup", {
cfg: api.config,
deps: { cron: harness.cron },
}),
);
enqueueSystemEvent(constants.DREAMING_SYSTEM_EVENT_TEXT, {
sessionKey: "agent:main:main",
contextKey: "cron:memory-dreaming",
});
const beforeAgentReply = getBeforeAgentReplyHandler(onMock);
const result = await beforeAgentReply(
{ cleanedBody: constants.DREAMING_SYSTEM_EVENT_TEXT },
{ trigger: "heartbeat", workspaceDir: ".", sessionKey: "agent:main:main:heartbeat" },
);
expect(result).toEqual({
handled: true,
reason: "memory-core: short-term dreaming disabled",
});
} finally {
clearInternalHooks();
}
});
});
describe("short-term dreaming trigger", () => {

View File

@@ -1,3 +1,4 @@
import { peekSystemEventEntries } from "openclaw/plugin-sdk/infra-runtime";
import type { OpenClawConfig, OpenClawPluginApi } from "openclaw/plugin-sdk/memory-core";
import {
DEFAULT_MEMORY_DREAMING_FREQUENCY as DEFAULT_MEMORY_DREAMING_CRON_EXPR,
@@ -36,6 +37,7 @@ const LEGACY_REM_SLEEP_CRON_NAME = "Memory REM Dreaming";
const LEGACY_REM_SLEEP_CRON_TAG = "[managed-by=memory-core.dreaming.rem]";
const LEGACY_REM_SLEEP_EVENT_TEXT = "__openclaw_memory_core_rem_sleep__";
const RUNTIME_CRON_RECONCILE_INTERVAL_MS = 60_000;
const HEARTBEAT_ISOLATED_SESSION_SUFFIX = ":heartbeat";
type Logger = Pick<OpenClawPluginApi["logger"], "info" | "warn" | "error">;
@@ -344,6 +346,35 @@ function resolveStartupConfigFromEvent(event: unknown, fallback: OpenClawConfig)
return startupCfg as OpenClawConfig;
}
function resolveDreamingTriggerSessionKeys(sessionKey?: string): string[] {
const normalized = normalizeTrimmedString(sessionKey);
if (!normalized) {
return [];
}
const keys = [normalized];
// Isolated heartbeat runs execute in a sibling `:heartbeat` session while cron
// system events stay queued on the base main session.
if (normalized.endsWith(HEARTBEAT_ISOLATED_SESSION_SUFFIX)) {
const baseSessionKey = normalized.slice(0, -HEARTBEAT_ISOLATED_SESSION_SUFFIX.length).trim();
if (baseSessionKey) {
keys.push(baseSessionKey);
}
}
return Array.from(new Set(keys));
}
function hasPendingManagedDreamingCronEvent(sessionKey?: string): boolean {
return resolveDreamingTriggerSessionKeys(sessionKey).some((candidateSessionKey) =>
peekSystemEventEntries(candidateSessionKey).some(
(event) =>
event.contextKey?.startsWith("cron:") === true &&
normalizeTrimmedString(event.text) === DREAMING_SYSTEM_EVENT_TEXT,
),
);
}
export function resolveShortTermPromotionDreamingConfig(params: {
pluginConfig?: Record<string, unknown>;
cfg?: OpenClawConfig;
@@ -716,6 +747,12 @@ export function registerShortTermPromotionDreaming(api: OpenClawPluginApi): void
const config = await reconcileManagedDreamingCron({
reason: "runtime",
});
if (
!hasPendingManagedDreamingCronEvent(ctx.sessionKey) ||
!includesSystemEventToken(event.cleanedBody, DREAMING_SYSTEM_EVENT_TEXT)
) {
return undefined;
}
return await runShortTermDreamingPromotionIfTriggered({
cleanedBody: event.cleanedBody,
trigger: ctx.trigger,