mirror of
https://github.com/openclaw/openclaw.git
synced 2026-05-01 06:05:05 +02:00
* fix: sendPolicy deny suppresses delivery, not inbound processing (#53328) Previously, sendPolicy "deny" returned early before the agent dispatch, preventing the agent from ever seeing the message. This broke the use case of an agent listening on WhatsApp groups with sendPolicy: deny to read messages without replying — the agent couldn't read them at all. Move the deny gate from before the agent dispatch to after it. The agent now processes inbound messages normally (context, memory, tool calls), but all outbound delivery paths are suppressed: final replies, tool results, block replies, working status, plan updates, typing indicators, and TTS payloads. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: propagate sendPolicy to ACP tail dispatch instead of hardcoded allow The ACP tail dispatch path (ctx.AcpDispatchTailAfterReset) was passing sendPolicy: "allow" unconditionally, which would bypass delivery suppression in a /reset <tail> turn when the session has sendPolicy deny. Pass through the resolved sendPolicy so the tail dispatch respects it. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: guard before_dispatch hook and ACP tail dispatch under sendPolicy deny before_dispatch handled replies were leaking through sendFinalPayload before the suppressDelivery guard was checked. ACP tail dispatch (from /new <tail>) was being rejected by acp-runtime.ts deny checks instead of proceeding with delivery suppression handled downstream. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * auto-reply: propagate deny suppression to reply_dispatch * fix(acp): suppress onReplyStart when user delivery is denied When sendPolicy resolves to "deny", ACP tail dispatch still invoked onReplyStart via startReplyLifecycle before the suppressUserDelivery check. Channels wire onReplyStart to typing indicators, so deny-scoped sessions could still emit outbound typing events on /reset <tail> flows and command bypass paths. Gate startReplyLifecycleOnce on suppressUserDelivery so the lifecycle is marked started but the callback is skipped. Payload delivery was already suppressed; this closes the typing-indicator leak flagged by Codex review (PR #65461 P1/P2). * fix(acp): route non-tail deny turns through ACP when suppression is wired tryDispatchAcpReplyHook was returning early for non-tail, non-command ACP turns under sendPolicy: "deny", causing ACP-bound sessions to fall back to the embedded reply path instead of flowing through acpManager.runTurn. That diverged ACP session state, tool calls, and memory whenever delivery suppression was active. Now the early-return only fires when sendPolicy is "deny" AND the event lacks suppressUserDelivery — i.e., when downstream delivery suppression is not wired up. When suppressUserDelivery is set, dispatch-acp-delivery already drops outbound sends (see onReplyStart / deliver guards), so ACP can safely run the turn with state consistency preserved. Existing behavior preserved: - Command bypass still overrides deny - Tail dispatch still overrides deny - Plain-text deny turns without suppression still short-circuit Addresses Codex bot P1 feedback on #65461. * fix: gate empty-body typing indicator behind suppressTyping (#53328) * fix: guard plugin-binding + fast-abort outbound paths under sendPolicy deny The original PR computed suppressDelivery inside the try block, which was after two outbound paths: 1. The plugin-owned binding block (sendBindingNotice calls for unavailable/declined/error outcomes, plus the plugin's own "handled" outcome) ran before the suppressDelivery flag existed, so plugin notices still leaked under deny. 2. The fast-abort path dispatched "Agent was aborted." via routeReplyToOriginating / sendFinalReply before the flag existed. Move resolveSendPolicy() above the plugin-binding block so suppressDelivery covers every outbound path downstream, matching the PR description's claim that "all outbound paths are guarded by the flag." Plugin-bound inbound handling under deny: plugin handlers can emit outbound replies we cannot rewind, so skip the claim hook entirely under deny and fall through to normal (suppressed) agent processing. touchConversationBindingRecord still runs so binding activity stays tracked. Fast-abort under deny: still run the abort and record the completed state, just don't emit the abort reply. Tests: - suppresses the fast-abort reply under sendPolicy deny - delivers the fast-abort reply normally when sendPolicy is allow (regression guard) - skips plugin-bound claim hook under deny and falls through to suppressed agent dispatch Addresses Codex review findings on PR #65461. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Lobster <lobster@shahine.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -78,9 +78,12 @@ describe("handleCommands send policy", () => {
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
|
||||
it("prefers the target session entry from sessionStore for send policy checks", async () => {
|
||||
it("allows processing to continue even when send policy is deny (#53328)", async () => {
|
||||
// sendPolicy deny now only suppresses outbound delivery, not inbound processing.
|
||||
// The deny gate moved to dispatch-from-config.ts where it suppresses delivery
|
||||
// after the agent has processed the message.
|
||||
const result = await handleCommands(makeParams());
|
||||
|
||||
expect(result).toEqual({ shouldContinue: false });
|
||||
expect(result).toEqual({ shouldContinue: true });
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
import { logVerbose } from "../../globals.js";
|
||||
import { resolveSendPolicy } from "../../sessions/send-policy.js";
|
||||
import { shouldHandleTextCommands } from "../commands-registry.js";
|
||||
import { emitResetCommandHooks } from "./commands-reset-hooks.js";
|
||||
import { maybeHandleResetCommand } from "./commands-reset.js";
|
||||
@@ -41,18 +39,8 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
|
||||
}
|
||||
}
|
||||
|
||||
const targetSessionEntry = params.sessionStore?.[params.sessionKey] ?? params.sessionEntry;
|
||||
const sendPolicy = resolveSendPolicy({
|
||||
cfg: params.cfg,
|
||||
entry: targetSessionEntry,
|
||||
sessionKey: params.sessionKey,
|
||||
channel: targetSessionEntry?.channel ?? params.command.channel,
|
||||
chatType: targetSessionEntry?.chatType,
|
||||
});
|
||||
if (sendPolicy === "deny") {
|
||||
logVerbose(`Send blocked by policy for session ${params.sessionKey ?? "unknown"}`);
|
||||
return { shouldContinue: false };
|
||||
}
|
||||
|
||||
// sendPolicy "deny" is now handled downstream in dispatch-from-config.ts
|
||||
// by suppressing outbound delivery while still allowing the agent to process
|
||||
// the inbound message (context, memory, tool calls). See #53328.
|
||||
return { shouldContinue: true };
|
||||
}
|
||||
|
||||
@@ -312,6 +312,33 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
|
||||
expect(onReplyStart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("does not fire onReplyStart when user delivery is suppressed", async () => {
|
||||
const onReplyStart = vi.fn(async () => {});
|
||||
const dispatcher = createDispatcher();
|
||||
const coordinator = createAcpDispatchDeliveryCoordinator({
|
||||
cfg: createAcpTestConfig(),
|
||||
ctx: buildTestCtx({
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
SessionKey: "agent:codex-acp:session-1",
|
||||
}),
|
||||
dispatcher,
|
||||
inboundAudio: false,
|
||||
suppressUserDelivery: true,
|
||||
shouldRouteToOriginating: false,
|
||||
onReplyStart,
|
||||
});
|
||||
|
||||
// Directly invoking the lifecycle (e.g. from dispatch-acp.ts before the
|
||||
// first deliver call) must not fire the typing indicator when delivery is
|
||||
// suppressed by sendPolicy: "deny".
|
||||
await coordinator.startReplyLifecycle();
|
||||
const delivered = await coordinator.deliver("final", { text: "hello" });
|
||||
|
||||
expect(delivered).toBe(false);
|
||||
expect(onReplyStart).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("keeps parent-owned background ACP child delivery silent while preserving accumulated output", async () => {
|
||||
const dispatcher = createDispatcher();
|
||||
const coordinator = createAcpDispatchDeliveryCoordinator({
|
||||
|
||||
@@ -215,6 +215,13 @@ export function createAcpDispatchDeliveryCoordinator(params: {
|
||||
return;
|
||||
}
|
||||
state.startedReplyLifecycle = true;
|
||||
// When delivery is suppressed (e.g. sendPolicy: "deny"), do not fire the
|
||||
// onReplyStart callback — channels wire it to typing indicators / lifecycle
|
||||
// notifications that should not leak outbound events while the session is
|
||||
// under a deny policy. See #53328.
|
||||
if (params.suppressUserDelivery) {
|
||||
return;
|
||||
}
|
||||
void Promise.resolve(params.onReplyStart?.()).catch((error) => {
|
||||
logVerbose(
|
||||
`dispatch-acp: reply lifecycle start failed: ${error instanceof Error ? error.message : String(error)}`,
|
||||
|
||||
@@ -3004,6 +3004,28 @@ describe("before_dispatch hook", () => {
|
||||
expect(result.queuedFinal).toBe(true);
|
||||
});
|
||||
|
||||
it("suppresses before_dispatch handled reply when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true, text: "Blocked" });
|
||||
const dispatcher = createDispatcher();
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx: createHookCtx({ SessionKey: "test:session" }),
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
});
|
||||
// Hook handled the message (no model dispatch)
|
||||
expect(hookMocks.runner.runBeforeDispatch).toHaveBeenCalled();
|
||||
// But delivery must be suppressed
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
expect(mocks.routeReply).not.toHaveBeenCalled();
|
||||
expect(result.queuedFinal).toBe(false);
|
||||
});
|
||||
|
||||
it("continues default dispatch when hook returns not handled", async () => {
|
||||
hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: false });
|
||||
const dispatcher = createDispatcher();
|
||||
@@ -3017,3 +3039,333 @@ describe("before_dispatch hook", () => {
|
||||
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "model reply" });
|
||||
});
|
||||
});
|
||||
|
||||
describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => {
|
||||
beforeEach(() => {
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
(hookName?: string) => hookName === "reply_dispatch",
|
||||
);
|
||||
hookMocks.runner.runReplyDispatch.mockResolvedValue(undefined);
|
||||
hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined);
|
||||
});
|
||||
|
||||
it("still calls the replyResolver when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
// The agent MUST process the message (replyResolver called)
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("passes suppressUserDelivery to tail reply_dispatch when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
hookMocks.runner.runReplyDispatch.mockImplementation(async (event: unknown) => {
|
||||
const candidate = event as { isTailDispatch?: boolean };
|
||||
if (candidate.isTailDispatch) {
|
||||
return {
|
||||
handled: true,
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
};
|
||||
}
|
||||
return undefined;
|
||||
});
|
||||
const dispatcher = createDispatcher();
|
||||
const ctx = buildTestCtx({
|
||||
SessionKey: "test:session",
|
||||
AcpDispatchTailAfterReset: true,
|
||||
});
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver: async () => ({ text: "agent reply" }),
|
||||
});
|
||||
|
||||
expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
isTailDispatch: true,
|
||||
sendPolicy: "deny",
|
||||
suppressUserDelivery: true,
|
||||
}),
|
||||
expect.any(Object),
|
||||
);
|
||||
});
|
||||
|
||||
it("suppresses final reply delivery when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
// Delivery MUST be suppressed
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
expect(result.queuedFinal).toBe(false);
|
||||
});
|
||||
|
||||
it("suppresses tool result delivery when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
let capturedOnToolResult: ((payload: ReplyPayload) => Promise<void>) | undefined;
|
||||
const replyResolver = vi.fn(
|
||||
async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => {
|
||||
capturedOnToolResult = opts?.onToolResult as
|
||||
| ((payload: ReplyPayload) => Promise<void>)
|
||||
| undefined;
|
||||
return { text: "reply" } satisfies ReplyPayload;
|
||||
},
|
||||
);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
// Trigger a tool result — delivery should be suppressed
|
||||
expect(capturedOnToolResult).toBeDefined();
|
||||
await capturedOnToolResult!({ text: "tool output" });
|
||||
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("suppresses block reply delivery when sendPolicy is deny", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
let capturedOnBlockReply:
|
||||
| ((payload: ReplyPayload, context?: unknown) => Promise<void>)
|
||||
| undefined;
|
||||
const replyResolver = vi.fn(
|
||||
async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => {
|
||||
capturedOnBlockReply = opts?.onBlockReply as
|
||||
| ((payload: ReplyPayload, context?: unknown) => Promise<void>)
|
||||
| undefined;
|
||||
return [] as ReplyPayload[];
|
||||
},
|
||||
);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
// Trigger a block reply — delivery should be suppressed
|
||||
expect(capturedOnBlockReply).toBeDefined();
|
||||
await capturedOnBlockReply!({ text: "streaming chunk" });
|
||||
expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it("delivers replies normally when sendPolicy is allow", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "allow",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("delivers replies normally when sendPolicy is unset (defaults to allow)", async () => {
|
||||
setNoAbort();
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({ SessionKey: "test:session" });
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it("suppresses the fast-abort reply under sendPolicy deny", async () => {
|
||||
// Fast-abort runs before sendPolicy in the old code, so the abort reply
|
||||
// leaked. Under the guard, the abort is still recorded but no reply is
|
||||
// dispatched. See #53328.
|
||||
mocks.tryFastAbortFromMessage.mockResolvedValue({
|
||||
handled: true,
|
||||
aborted: true,
|
||||
});
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "telegram",
|
||||
Body: "/stop",
|
||||
SessionKey: "test:session",
|
||||
});
|
||||
|
||||
const result = await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
expect(replyResolver).not.toHaveBeenCalled();
|
||||
expect(result.queuedFinal).toBe(false);
|
||||
});
|
||||
|
||||
it("delivers the fast-abort reply normally when sendPolicy is allow (regression guard)", async () => {
|
||||
mocks.tryFastAbortFromMessage.mockResolvedValue({
|
||||
handled: true,
|
||||
aborted: true,
|
||||
});
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "allow",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "hi" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "telegram",
|
||||
Body: "/stop",
|
||||
SessionKey: "test:session",
|
||||
});
|
||||
|
||||
await dispatchReplyFromConfig({
|
||||
ctx,
|
||||
cfg: emptyConfig,
|
||||
dispatcher,
|
||||
replyResolver,
|
||||
});
|
||||
|
||||
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({
|
||||
text: "⚙️ Agent was aborted.",
|
||||
});
|
||||
});
|
||||
|
||||
it("skips plugin-bound claim hook under deny and falls through to suppressed agent dispatch", async () => {
|
||||
// Plugin-bound inbound handlers can emit outbound replies we cannot
|
||||
// rewind. Under deny, skip the plugin claim entirely and let the agent
|
||||
// process the message with delivery suppressed. See #53328.
|
||||
setNoAbort();
|
||||
hookMocks.runner.hasHooks.mockImplementation(
|
||||
((hookName?: string) =>
|
||||
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
|
||||
);
|
||||
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
|
||||
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
|
||||
status: "handled",
|
||||
result: { handled: true },
|
||||
});
|
||||
sessionBindingMocks.resolveByConversation.mockReturnValue({
|
||||
bindingId: "binding-deny",
|
||||
targetSessionKey: "plugin-binding:codex:abc123",
|
||||
targetKind: "session",
|
||||
conversation: {
|
||||
channel: "discord",
|
||||
accountId: "default",
|
||||
conversationId: "channel:deny-test",
|
||||
},
|
||||
status: "active",
|
||||
boundAt: 1710000000000,
|
||||
metadata: {
|
||||
pluginBindingOwner: "plugin",
|
||||
pluginId: "openclaw-codex-app-server",
|
||||
pluginRoot: "/tmp/plugin",
|
||||
},
|
||||
} satisfies SessionBindingRecord);
|
||||
sessionStoreMocks.currentEntry = {
|
||||
sessionId: "s1",
|
||||
updatedAt: 0,
|
||||
sendPolicy: "deny",
|
||||
};
|
||||
const dispatcher = createDispatcher();
|
||||
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
|
||||
const ctx = buildTestCtx({
|
||||
Provider: "discord",
|
||||
Surface: "discord",
|
||||
OriginatingChannel: "discord",
|
||||
OriginatingTo: "discord:channel:deny-test",
|
||||
To: "discord:channel:deny-test",
|
||||
AccountId: "default",
|
||||
SessionKey: "agent:main:discord:channel:deny-test",
|
||||
Body: "observed message",
|
||||
});
|
||||
|
||||
await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver });
|
||||
|
||||
// Binding is still tracked (touch runs before the gate)...
|
||||
expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-deny");
|
||||
// ...but the plugin claim hook MUST NOT be invoked under deny — the
|
||||
// plugin can't be trusted to honor suppressDelivery on its outbound path.
|
||||
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
|
||||
// Agent still processes the message (the whole point of the PR)...
|
||||
expect(replyResolver).toHaveBeenCalledTimes(1);
|
||||
// ...but no final reply is delivered.
|
||||
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
@@ -427,6 +427,25 @@ export async function dispatchReplyFromConfig(
|
||||
? toPluginConversationBinding(pluginOwnedBindingRecord)
|
||||
: null;
|
||||
|
||||
// Resolve sendPolicy early so every outbound path below (plugin-binding
|
||||
// notices, fast-abort, normal dispatch) honors suppressDelivery. Under
|
||||
// sendPolicy: "deny" the agent still processes inbound, but no outbound
|
||||
// reply/notice/indicator is allowed. See #53328.
|
||||
const sendPolicy = resolveSendPolicy({
|
||||
cfg,
|
||||
entry: sessionStoreEntry.entry,
|
||||
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
|
||||
channel:
|
||||
sessionStoreEntry.entry?.channel ??
|
||||
ctx.OriginatingChannel ??
|
||||
ctx.Surface ??
|
||||
ctx.Provider ??
|
||||
undefined,
|
||||
chatType: sessionStoreEntry.entry?.chatType,
|
||||
});
|
||||
const suppressDelivery = sendPolicy === "deny";
|
||||
const suppressHookUserDelivery = suppressAcpChildUserDelivery || suppressDelivery;
|
||||
|
||||
let pluginFallbackReason:
|
||||
| "plugin-bound-fallback-missing-plugin"
|
||||
| "plugin-bound-fallback-no-handler"
|
||||
@@ -434,68 +453,78 @@ export async function dispatchReplyFromConfig(
|
||||
|
||||
if (pluginOwnedBinding) {
|
||||
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
|
||||
logVerbose(
|
||||
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
|
||||
);
|
||||
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
|
||||
? await hookRunner.runInboundClaimForPluginOutcome(
|
||||
pluginOwnedBinding.pluginId,
|
||||
inboundClaimEvent,
|
||||
inboundClaimContext,
|
||||
)
|
||||
: (() => {
|
||||
const pluginLoaded =
|
||||
getGlobalPluginRegistry()?.plugins.some(
|
||||
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded",
|
||||
) ?? false;
|
||||
return pluginLoaded
|
||||
? ({ status: "no_handler" } as const)
|
||||
: ({ status: "missing_plugin" } as const);
|
||||
})();
|
||||
if (suppressDelivery) {
|
||||
// Plugin-bound inbound handlers typically emit outbound replies we
|
||||
// cannot rewind. Under deny, skip the plugin claim entirely and fall
|
||||
// through to normal (suppressed) agent processing so no delivery leaks
|
||||
// via the plugin path. See #53328.
|
||||
logVerbose(
|
||||
`plugin-bound inbound skipped under sendPolicy: deny (plugin=${pluginOwnedBinding.pluginId} session=${sessionKey ?? "unknown"}); falling through to suppressed agent processing`,
|
||||
);
|
||||
} else {
|
||||
logVerbose(
|
||||
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
|
||||
);
|
||||
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
|
||||
? await hookRunner.runInboundClaimForPluginOutcome(
|
||||
pluginOwnedBinding.pluginId,
|
||||
inboundClaimEvent,
|
||||
inboundClaimContext,
|
||||
)
|
||||
: (() => {
|
||||
const pluginLoaded =
|
||||
getGlobalPluginRegistry()?.plugins.some(
|
||||
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded",
|
||||
) ?? false;
|
||||
return pluginLoaded
|
||||
? ({ status: "no_handler" } as const)
|
||||
: ({ status: "missing_plugin" } as const);
|
||||
})();
|
||||
|
||||
switch (targetedClaimOutcome.status) {
|
||||
case "handled": {
|
||||
markIdle("plugin_binding_dispatch");
|
||||
recordProcessed("completed", { reason: "plugin-bound-handled" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
case "missing_plugin":
|
||||
case "no_handler": {
|
||||
pluginFallbackReason =
|
||||
targetedClaimOutcome.status === "missing_plugin"
|
||||
? "plugin-bound-fallback-missing-plugin"
|
||||
: "plugin-bound-fallback-no-handler";
|
||||
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
|
||||
const didSendNotice = await sendBindingNotice(
|
||||
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
|
||||
"additive",
|
||||
);
|
||||
if (didSendNotice) {
|
||||
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
|
||||
}
|
||||
switch (targetedClaimOutcome.status) {
|
||||
case "handled": {
|
||||
markIdle("plugin_binding_dispatch");
|
||||
recordProcessed("completed", { reason: "plugin-bound-handled" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
case "missing_plugin":
|
||||
case "no_handler": {
|
||||
pluginFallbackReason =
|
||||
targetedClaimOutcome.status === "missing_plugin"
|
||||
? "plugin-bound-fallback-missing-plugin"
|
||||
: "plugin-bound-fallback-no-handler";
|
||||
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
|
||||
const didSendNotice = await sendBindingNotice(
|
||||
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
|
||||
"additive",
|
||||
);
|
||||
if (didSendNotice) {
|
||||
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "declined": {
|
||||
await sendBindingNotice(
|
||||
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
|
||||
"terminal",
|
||||
);
|
||||
markIdle("plugin_binding_declined");
|
||||
recordProcessed("completed", { reason: "plugin-bound-declined" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
case "error": {
|
||||
logVerbose(
|
||||
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
|
||||
);
|
||||
await sendBindingNotice(
|
||||
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
|
||||
"terminal",
|
||||
);
|
||||
markIdle("plugin_binding_error");
|
||||
recordProcessed("completed", { reason: "plugin-bound-error" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
break;
|
||||
}
|
||||
case "declined": {
|
||||
await sendBindingNotice(
|
||||
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
|
||||
"terminal",
|
||||
);
|
||||
markIdle("plugin_binding_declined");
|
||||
recordProcessed("completed", { reason: "plugin-bound-declined" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
case "error": {
|
||||
logVerbose(
|
||||
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
|
||||
);
|
||||
await sendBindingNotice(
|
||||
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
|
||||
"terminal",
|
||||
);
|
||||
markIdle("plugin_binding_error");
|
||||
recordProcessed("completed", { reason: "plugin-bound-error" });
|
||||
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -536,24 +565,30 @@ export async function dispatchReplyFromConfig(
|
||||
}
|
||||
const fastAbort = await fastAbortResolver({ ctx, cfg });
|
||||
if (fastAbort.handled) {
|
||||
const payload = {
|
||||
text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents),
|
||||
} satisfies ReplyPayload;
|
||||
let queuedFinal = false;
|
||||
let routedFinalCount = 0;
|
||||
const result = await routeReplyToOriginating(payload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
}
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
|
||||
);
|
||||
if (!suppressDelivery) {
|
||||
const payload = {
|
||||
text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents),
|
||||
} satisfies ReplyPayload;
|
||||
const result = await routeReplyToOriginating(payload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
}
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
queuedFinal = dispatcher.sendFinalReply(payload);
|
||||
}
|
||||
} else {
|
||||
queuedFinal = dispatcher.sendFinalReply(payload);
|
||||
logVerbose(
|
||||
`dispatch-from-config: fast_abort reply suppressed by sendPolicy: deny (session=${sessionKey ?? "unknown"})`,
|
||||
);
|
||||
}
|
||||
const counts = dispatcher.getQueuedCounts();
|
||||
counts.final += routedFinalCount;
|
||||
@@ -562,19 +597,6 @@ export async function dispatchReplyFromConfig(
|
||||
return { queuedFinal, counts };
|
||||
}
|
||||
|
||||
const sendPolicy = resolveSendPolicy({
|
||||
cfg,
|
||||
entry: sessionStoreEntry.entry,
|
||||
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
|
||||
channel:
|
||||
sessionStoreEntry.entry?.channel ??
|
||||
ctx.OriginatingChannel ??
|
||||
ctx.Surface ??
|
||||
ctx.Provider ??
|
||||
undefined,
|
||||
chatType: sessionStoreEntry.entry?.chatType,
|
||||
});
|
||||
|
||||
const shouldSendToolSummaries = ctx.ChatType !== "group" || ctx.IsForum === true;
|
||||
const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true;
|
||||
const sendFinalPayload = async (
|
||||
@@ -630,7 +652,7 @@ export async function dispatchReplyFromConfig(
|
||||
const text = beforeDispatchResult.text;
|
||||
let queuedFinal = false;
|
||||
let routedFinalCount = 0;
|
||||
if (text) {
|
||||
if (text && !suppressDelivery) {
|
||||
const handledReply = await sendFinalPayload({ text });
|
||||
queuedFinal = handledReply.queuedFinal;
|
||||
routedFinalCount += handledReply.routedFinalCount;
|
||||
@@ -652,7 +674,7 @@ export async function dispatchReplyFromConfig(
|
||||
inboundAudio,
|
||||
sessionTtsAuto,
|
||||
ttsChannel,
|
||||
suppressUserDelivery: suppressAcpChildUserDelivery,
|
||||
suppressUserDelivery: suppressHookUserDelivery,
|
||||
shouldRouteToOriginating,
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
@@ -676,14 +698,12 @@ export async function dispatchReplyFromConfig(
|
||||
}
|
||||
}
|
||||
|
||||
if (sendPolicy === "deny") {
|
||||
// When sendPolicy is "deny", we still let the agent process the inbound message
|
||||
// (context, memory, tool calls) but suppress all outbound delivery.
|
||||
if (suppressDelivery) {
|
||||
logVerbose(
|
||||
`Send blocked by policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"}`,
|
||||
`Delivery suppressed by send policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"} — agent will still process the message`,
|
||||
);
|
||||
const counts = dispatcher.getQueuedCounts();
|
||||
recordProcessed("completed", { reason: "send_policy_deny" });
|
||||
markIdle("message_completed");
|
||||
return { queuedFinal: false, counts };
|
||||
}
|
||||
|
||||
const toolStartStatusesSent = new Set<string>();
|
||||
@@ -710,6 +730,9 @@ export async function dispatchReplyFromConfig(
|
||||
return parts.join("\n\n").trim() || "Planning next steps.";
|
||||
};
|
||||
const maybeSendWorkingStatus = async (label: string): Promise<void> => {
|
||||
if (suppressDelivery) {
|
||||
return;
|
||||
}
|
||||
const normalizedLabel = normalizeWorkingLabel(label);
|
||||
if (
|
||||
!shouldEmitVerboseProgress() ||
|
||||
@@ -735,7 +758,7 @@ export async function dispatchReplyFromConfig(
|
||||
explanation?: string;
|
||||
steps?: string[];
|
||||
}): Promise<void> => {
|
||||
if (!shouldEmitVerboseProgress()) {
|
||||
if (suppressDelivery || !shouldEmitVerboseProgress()) {
|
||||
return;
|
||||
}
|
||||
const replyPayload: ReplyPayload = {
|
||||
@@ -818,7 +841,8 @@ export async function dispatchReplyFromConfig(
|
||||
};
|
||||
const typing = resolveRunTypingPolicy({
|
||||
requestedPolicy: params.replyOptions?.typingPolicy,
|
||||
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
|
||||
suppressTyping:
|
||||
suppressDelivery || params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
|
||||
originatingChannel,
|
||||
systemEvent: shouldRouteToOriginating,
|
||||
});
|
||||
@@ -833,6 +857,9 @@ export async function dispatchReplyFromConfig(
|
||||
suppressTyping: typing.suppressTyping,
|
||||
onToolResult: (payload: ReplyPayload) => {
|
||||
const run = async () => {
|
||||
if (suppressDelivery) {
|
||||
return;
|
||||
}
|
||||
const ttsPayload = await maybeApplyTtsToReplyPayload({
|
||||
payload,
|
||||
cfg,
|
||||
@@ -881,6 +908,9 @@ export async function dispatchReplyFromConfig(
|
||||
},
|
||||
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
|
||||
const run = async () => {
|
||||
if (suppressDelivery) {
|
||||
return;
|
||||
}
|
||||
// Suppress reasoning payloads — channels using this generic dispatch
|
||||
// path (WhatsApp, web, etc.) do not have a dedicated reasoning lane.
|
||||
// Telegram has its own dispatch path that handles reasoning splitting.
|
||||
@@ -942,11 +972,12 @@ export async function dispatchReplyFromConfig(
|
||||
inboundAudio,
|
||||
sessionTtsAuto,
|
||||
ttsChannel,
|
||||
suppressUserDelivery: suppressHookUserDelivery,
|
||||
shouldRouteToOriginating,
|
||||
originatingChannel,
|
||||
originatingTo,
|
||||
shouldSendToolSummaries,
|
||||
sendPolicy: "allow",
|
||||
sendPolicy,
|
||||
isTailDispatch: true,
|
||||
},
|
||||
{
|
||||
@@ -971,63 +1002,65 @@ export async function dispatchReplyFromConfig(
|
||||
|
||||
let queuedFinal = false;
|
||||
let routedFinalCount = 0;
|
||||
for (const reply of replies) {
|
||||
// Suppress reasoning payloads from channel delivery — channels using this
|
||||
// generic dispatch path do not have a dedicated reasoning lane.
|
||||
if (reply.isReasoning === true) {
|
||||
continue;
|
||||
}
|
||||
const finalReply = await sendFinalPayload(reply);
|
||||
queuedFinal = finalReply.queuedFinal || queuedFinal;
|
||||
routedFinalCount += finalReply.routedFinalCount;
|
||||
}
|
||||
|
||||
const ttsMode = resolveConfiguredTtsMode(cfg);
|
||||
// Generate TTS-only reply after block streaming completes (when there's no final reply).
|
||||
// This handles the case where block streaming succeeds and drops final payloads,
|
||||
// but we still want TTS audio to be generated from the accumulated block content.
|
||||
if (
|
||||
ttsMode === "final" &&
|
||||
replies.length === 0 &&
|
||||
blockCount > 0 &&
|
||||
accumulatedBlockText.trim()
|
||||
) {
|
||||
try {
|
||||
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
|
||||
payload: { text: accumulatedBlockText },
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
kind: "final",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
});
|
||||
// Only send if TTS was actually applied (mediaUrl exists)
|
||||
if (ttsSyntheticReply.mediaUrl) {
|
||||
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
|
||||
const ttsOnlyPayload: ReplyPayload = {
|
||||
mediaUrl: ttsSyntheticReply.mediaUrl,
|
||||
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
||||
};
|
||||
const result = await routeReplyToOriginating(ttsOnlyPayload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok || queuedFinal;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
}
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
|
||||
queuedFinal = didQueue || queuedFinal;
|
||||
}
|
||||
if (!suppressDelivery) {
|
||||
for (const reply of replies) {
|
||||
// Suppress reasoning payloads from channel delivery — channels using this
|
||||
// generic dispatch path do not have a dedicated reasoning lane.
|
||||
if (reply.isReasoning === true) {
|
||||
continue;
|
||||
}
|
||||
const finalReply = await sendFinalPayload(reply);
|
||||
queuedFinal = finalReply.queuedFinal || queuedFinal;
|
||||
routedFinalCount += finalReply.routedFinalCount;
|
||||
}
|
||||
|
||||
const ttsMode = resolveConfiguredTtsMode(cfg);
|
||||
// Generate TTS-only reply after block streaming completes (when there's no final reply).
|
||||
// This handles the case where block streaming succeeds and drops final payloads,
|
||||
// but we still want TTS audio to be generated from the accumulated block content.
|
||||
if (
|
||||
ttsMode === "final" &&
|
||||
replies.length === 0 &&
|
||||
blockCount > 0 &&
|
||||
accumulatedBlockText.trim()
|
||||
) {
|
||||
try {
|
||||
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
|
||||
payload: { text: accumulatedBlockText },
|
||||
cfg,
|
||||
channel: ttsChannel,
|
||||
kind: "final",
|
||||
inboundAudio,
|
||||
ttsAuto: sessionTtsAuto,
|
||||
});
|
||||
// Only send if TTS was actually applied (mediaUrl exists)
|
||||
if (ttsSyntheticReply.mediaUrl) {
|
||||
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
|
||||
const ttsOnlyPayload: ReplyPayload = {
|
||||
mediaUrl: ttsSyntheticReply.mediaUrl,
|
||||
audioAsVoice: ttsSyntheticReply.audioAsVoice,
|
||||
};
|
||||
const result = await routeReplyToOriginating(ttsOnlyPayload);
|
||||
if (result) {
|
||||
queuedFinal = result.ok || queuedFinal;
|
||||
if (result.ok) {
|
||||
routedFinalCount += 1;
|
||||
}
|
||||
if (!result.ok) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
|
||||
);
|
||||
}
|
||||
} else {
|
||||
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
|
||||
queuedFinal = didQueue || queuedFinal;
|
||||
}
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
} catch (err) {
|
||||
logVerbose(
|
||||
`dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -350,7 +350,12 @@ export async function runPreparedReply(
|
||||
sessionCtx.MediaPath || (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0),
|
||||
);
|
||||
if (!baseBodyTrimmed && !hasMediaAttachment) {
|
||||
await typing.onReplyStart();
|
||||
// Skip onReplyStart when typing is suppressed (e.g. sendPolicy deny) —
|
||||
// otherwise channels that wire onReplyStart to typing indicators leak
|
||||
// visible signals even though outbound delivery is suppressed.
|
||||
if (!suppressTyping) {
|
||||
await typing.onReplyStart();
|
||||
}
|
||||
logVerbose("Inbound body empty after normalization; skipping agent run");
|
||||
typing.cleanup();
|
||||
return {
|
||||
|
||||
@@ -117,6 +117,74 @@ describe("tryDispatchAcpReplyHook", () => {
|
||||
expect(dispatchMock).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it("dispatches non-tail ACP turn under deny when suppressUserDelivery is set", async () => {
|
||||
bypassMock.mockResolvedValue(false);
|
||||
dispatchMock.mockResolvedValue({
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
});
|
||||
|
||||
const result = await tryDispatchAcpReplyHook(
|
||||
{
|
||||
...event,
|
||||
sendPolicy: "deny",
|
||||
suppressUserDelivery: true,
|
||||
ctx: buildTestCtx({
|
||||
SessionKey: "agent:test:session",
|
||||
BodyForCommands: "write a test",
|
||||
BodyForAgent: "write a test",
|
||||
}),
|
||||
},
|
||||
ctx,
|
||||
);
|
||||
|
||||
// Non-tail, non-command ACP turns under deny must still flow through ACP
|
||||
// runtime so session/tool state stays consistent — delivery suppression is
|
||||
// handled inside the ACP delivery path via suppressUserDelivery.
|
||||
expect(dispatchMock).toHaveBeenCalledOnce();
|
||||
expect(dispatchMock).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
suppressUserDelivery: true,
|
||||
bypassForCommand: false,
|
||||
}),
|
||||
);
|
||||
expect(result).toEqual({
|
||||
handled: true,
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
});
|
||||
});
|
||||
|
||||
it("allows tail dispatch through when sendPolicy is deny", async () => {
|
||||
bypassMock.mockResolvedValue(false);
|
||||
dispatchMock.mockResolvedValue({
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
});
|
||||
|
||||
const result = await tryDispatchAcpReplyHook(
|
||||
{
|
||||
...event,
|
||||
sendPolicy: "deny",
|
||||
isTailDispatch: true,
|
||||
ctx: buildTestCtx({
|
||||
SessionKey: "agent:test:session",
|
||||
BodyForCommands: "continue after reset",
|
||||
BodyForAgent: "continue after reset",
|
||||
}),
|
||||
},
|
||||
ctx,
|
||||
);
|
||||
|
||||
// Tail dispatch should proceed despite deny — delivery suppression is handled downstream
|
||||
expect(dispatchMock).toHaveBeenCalledOnce();
|
||||
expect(result).toEqual({
|
||||
handled: true,
|
||||
queuedFinal: false,
|
||||
counts: { tool: 0, block: 0, final: 0 },
|
||||
});
|
||||
});
|
||||
|
||||
it("does not let ACP claim reset commands before local command handling", async () => {
|
||||
bypassMock.mockResolvedValue(true);
|
||||
dispatchMock.mockResolvedValue(undefined);
|
||||
|
||||
@@ -60,13 +60,31 @@ export async function tryDispatchAcpReplyHook(
|
||||
event: PluginHookReplyDispatchEvent,
|
||||
ctx: PluginHookReplyDispatchContext,
|
||||
): Promise<PluginHookReplyDispatchResult | void> {
|
||||
if (event.sendPolicy === "deny" && !hasExplicitCommandCandidate(event.ctx)) {
|
||||
// Under sendPolicy: "deny", ACP-bound sessions still need their turns to flow
|
||||
// through acpManager.runTurn so session state, tool calls, and memory stay
|
||||
// consistent — only outbound delivery should be suppressed. The ACP delivery
|
||||
// path (dispatch-acp-delivery.ts) honors event.suppressUserDelivery to drop
|
||||
// user-facing sends. If suppressUserDelivery is not set under deny, we cannot
|
||||
// safely route through ACP (delivery would leak), so fall back to the
|
||||
// embedded reply path unless an explicit command candidate or tail dispatch
|
||||
// warrants going through ACP anyway.
|
||||
if (
|
||||
event.sendPolicy === "deny" &&
|
||||
!event.suppressUserDelivery &&
|
||||
!hasExplicitCommandCandidate(event.ctx) &&
|
||||
!event.isTailDispatch
|
||||
) {
|
||||
return;
|
||||
}
|
||||
const runtime = await loadDispatchAcpRuntime();
|
||||
const bypassForCommand = await runtime.shouldBypassAcpDispatchForCommand(event.ctx, ctx.cfg);
|
||||
|
||||
if (event.sendPolicy === "deny" && !bypassForCommand) {
|
||||
if (
|
||||
event.sendPolicy === "deny" &&
|
||||
!event.suppressUserDelivery &&
|
||||
!bypassForCommand &&
|
||||
!event.isTailDispatch
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user