fix: sendPolicy deny should suppress delivery, not inbound processing (#53328) (#65461)

* fix: sendPolicy deny suppresses delivery, not inbound processing (#53328)

Previously, sendPolicy "deny" returned early before the agent dispatch,
preventing the agent from ever seeing the message. This broke the use
case of an agent listening on WhatsApp groups with sendPolicy: deny to
read messages without replying — the agent couldn't read them at all.

Move the deny gate from before the agent dispatch to after it. The agent
now processes inbound messages normally (context, memory, tool calls),
but all outbound delivery paths are suppressed: final replies, tool
results, block replies, working status, plan updates, typing indicators,
and TTS payloads.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: propagate sendPolicy to ACP tail dispatch instead of hardcoded allow

The ACP tail dispatch path (ctx.AcpDispatchTailAfterReset) was passing
sendPolicy: "allow" unconditionally, which would bypass delivery
suppression in a /reset <tail> turn when the session has sendPolicy deny.

Pass through the resolved sendPolicy so the tail dispatch respects it.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: guard before_dispatch hook and ACP tail dispatch under sendPolicy deny

before_dispatch handled replies were leaking through sendFinalPayload
before the suppressDelivery guard was checked. ACP tail dispatch (from
/new <tail>) was being rejected by acp-runtime.ts deny checks instead
of proceeding with delivery suppression handled downstream.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* auto-reply: propagate deny suppression to reply_dispatch

* fix(acp): suppress onReplyStart when user delivery is denied

When sendPolicy resolves to "deny", ACP tail dispatch still invoked
onReplyStart via startReplyLifecycle before the suppressUserDelivery
check. Channels wire onReplyStart to typing indicators, so deny-scoped
sessions could still emit outbound typing events on /reset <tail>
flows and command bypass paths.

Gate startReplyLifecycleOnce on suppressUserDelivery so the lifecycle
is marked started but the callback is skipped. Payload delivery was
already suppressed; this closes the typing-indicator leak flagged by
Codex review (PR #65461 P1/P2).

* fix(acp): route non-tail deny turns through ACP when suppression is wired

tryDispatchAcpReplyHook was returning early for non-tail, non-command ACP
turns under sendPolicy: "deny", causing ACP-bound sessions to fall back
to the embedded reply path instead of flowing through acpManager.runTurn.
That diverged ACP session state, tool calls, and memory whenever
delivery suppression was active.

Now the early-return only fires when sendPolicy is "deny" AND the event
lacks suppressUserDelivery — i.e., when downstream delivery suppression
is not wired up. When suppressUserDelivery is set, dispatch-acp-delivery
already drops outbound sends (see onReplyStart / deliver guards), so ACP
can safely run the turn with state consistency preserved.

Existing behavior preserved:
- Command bypass still overrides deny
- Tail dispatch still overrides deny
- Plain-text deny turns without suppression still short-circuit

Addresses Codex bot P1 feedback on #65461.

* fix: gate empty-body typing indicator behind suppressTyping (#53328)

* fix: guard plugin-binding + fast-abort outbound paths under sendPolicy deny

The original PR computed suppressDelivery inside the try block, which was
after two outbound paths:

1. The plugin-owned binding block (sendBindingNotice calls for
   unavailable/declined/error outcomes, plus the plugin's own "handled"
   outcome) ran before the suppressDelivery flag existed, so plugin
   notices still leaked under deny.
2. The fast-abort path dispatched "Agent was aborted." via
   routeReplyToOriginating / sendFinalReply before the flag existed.

Move resolveSendPolicy() above the plugin-binding block so suppressDelivery
covers every outbound path downstream, matching the PR description's claim
that "all outbound paths are guarded by the flag."

Plugin-bound inbound handling under deny: plugin handlers can emit
outbound replies we cannot rewind, so skip the claim hook entirely under
deny and fall through to normal (suppressed) agent processing.
touchConversationBindingRecord still runs so binding activity stays
tracked.

Fast-abort under deny: still run the abort and record the completed
state, just don't emit the abort reply.

Tests:
- suppresses the fast-abort reply under sendPolicy deny
- delivers the fast-abort reply normally when sendPolicy is allow
  (regression guard)
- skips plugin-bound claim hook under deny and falls through to
  suppressed agent dispatch

Addresses Codex review findings on PR #65461.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Lobster <lobster@shahine.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Omar Shahine
2026-04-13 16:42:25 -07:00
committed by GitHub
parent 07b839f9b1
commit 0362f21784
9 changed files with 675 additions and 174 deletions

View File

@@ -78,9 +78,12 @@ describe("handleCommands send policy", () => {
vi.clearAllMocks();
});
it("prefers the target session entry from sessionStore for send policy checks", async () => {
it("allows processing to continue even when send policy is deny (#53328)", async () => {
// sendPolicy deny now only suppresses outbound delivery, not inbound processing.
// The deny gate moved to dispatch-from-config.ts where it suppresses delivery
// after the agent has processed the message.
const result = await handleCommands(makeParams());
expect(result).toEqual({ shouldContinue: false });
expect(result).toEqual({ shouldContinue: true });
});
});

View File

@@ -1,5 +1,3 @@
import { logVerbose } from "../../globals.js";
import { resolveSendPolicy } from "../../sessions/send-policy.js";
import { shouldHandleTextCommands } from "../commands-registry.js";
import { emitResetCommandHooks } from "./commands-reset-hooks.js";
import { maybeHandleResetCommand } from "./commands-reset.js";
@@ -41,18 +39,8 @@ export async function handleCommands(params: HandleCommandsParams): Promise<Comm
}
}
const targetSessionEntry = params.sessionStore?.[params.sessionKey] ?? params.sessionEntry;
const sendPolicy = resolveSendPolicy({
cfg: params.cfg,
entry: targetSessionEntry,
sessionKey: params.sessionKey,
channel: targetSessionEntry?.channel ?? params.command.channel,
chatType: targetSessionEntry?.chatType,
});
if (sendPolicy === "deny") {
logVerbose(`Send blocked by policy for session ${params.sessionKey ?? "unknown"}`);
return { shouldContinue: false };
}
// sendPolicy "deny" is now handled downstream in dispatch-from-config.ts
// by suppressing outbound delivery while still allowing the agent to process
// the inbound message (context, memory, tool calls). See #53328.
return { shouldContinue: true };
}

View File

@@ -312,6 +312,33 @@ describe("createAcpDispatchDeliveryCoordinator", () => {
expect(onReplyStart).not.toHaveBeenCalled();
});
it("does not fire onReplyStart when user delivery is suppressed", async () => {
const onReplyStart = vi.fn(async () => {});
const dispatcher = createDispatcher();
const coordinator = createAcpDispatchDeliveryCoordinator({
cfg: createAcpTestConfig(),
ctx: buildTestCtx({
Provider: "discord",
Surface: "discord",
SessionKey: "agent:codex-acp:session-1",
}),
dispatcher,
inboundAudio: false,
suppressUserDelivery: true,
shouldRouteToOriginating: false,
onReplyStart,
});
// Directly invoking the lifecycle (e.g. from dispatch-acp.ts before the
// first deliver call) must not fire the typing indicator when delivery is
// suppressed by sendPolicy: "deny".
await coordinator.startReplyLifecycle();
const delivered = await coordinator.deliver("final", { text: "hello" });
expect(delivered).toBe(false);
expect(onReplyStart).not.toHaveBeenCalled();
});
it("keeps parent-owned background ACP child delivery silent while preserving accumulated output", async () => {
const dispatcher = createDispatcher();
const coordinator = createAcpDispatchDeliveryCoordinator({

View File

@@ -215,6 +215,13 @@ export function createAcpDispatchDeliveryCoordinator(params: {
return;
}
state.startedReplyLifecycle = true;
// When delivery is suppressed (e.g. sendPolicy: "deny"), do not fire the
// onReplyStart callback — channels wire it to typing indicators / lifecycle
// notifications that should not leak outbound events while the session is
// under a deny policy. See #53328.
if (params.suppressUserDelivery) {
return;
}
void Promise.resolve(params.onReplyStart?.()).catch((error) => {
logVerbose(
`dispatch-acp: reply lifecycle start failed: ${error instanceof Error ? error.message : String(error)}`,

View File

@@ -3004,6 +3004,28 @@ describe("before_dispatch hook", () => {
expect(result.queuedFinal).toBe(true);
});
it("suppresses before_dispatch handled reply when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: true, text: "Blocked" });
const dispatcher = createDispatcher();
const result = await dispatchReplyFromConfig({
ctx: createHookCtx({ SessionKey: "test:session" }),
cfg: emptyConfig,
dispatcher,
});
// Hook handled the message (no model dispatch)
expect(hookMocks.runner.runBeforeDispatch).toHaveBeenCalled();
// But delivery must be suppressed
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(mocks.routeReply).not.toHaveBeenCalled();
expect(result.queuedFinal).toBe(false);
});
it("continues default dispatch when hook returns not handled", async () => {
hookMocks.runner.runBeforeDispatch.mockResolvedValue({ handled: false });
const dispatcher = createDispatcher();
@@ -3017,3 +3039,333 @@ describe("before_dispatch hook", () => {
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({ text: "model reply" });
});
});
describe("sendPolicy deny — suppress delivery, not processing (#53328)", () => {
beforeEach(() => {
hookMocks.runner.hasHooks.mockImplementation(
(hookName?: string) => hookName === "reply_dispatch",
);
hookMocks.runner.runReplyDispatch.mockResolvedValue(undefined);
hookMocks.runner.runBeforeDispatch.mockResolvedValue(undefined);
});
it("still calls the replyResolver when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
const ctx = buildTestCtx({ SessionKey: "test:session" });
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
// The agent MUST process the message (replyResolver called)
expect(replyResolver).toHaveBeenCalledTimes(1);
});
it("passes suppressUserDelivery to tail reply_dispatch when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
hookMocks.runner.runReplyDispatch.mockImplementation(async (event: unknown) => {
const candidate = event as { isTailDispatch?: boolean };
if (candidate.isTailDispatch) {
return {
handled: true,
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
};
}
return undefined;
});
const dispatcher = createDispatcher();
const ctx = buildTestCtx({
SessionKey: "test:session",
AcpDispatchTailAfterReset: true,
});
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver: async () => ({ text: "agent reply" }),
});
expect(hookMocks.runner.runReplyDispatch).toHaveBeenCalledWith(
expect.objectContaining({
isTailDispatch: true,
sendPolicy: "deny",
suppressUserDelivery: true,
}),
expect.any(Object),
);
});
it("suppresses final reply delivery when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
const ctx = buildTestCtx({ SessionKey: "test:session" });
const result = await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
// Delivery MUST be suppressed
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(result.queuedFinal).toBe(false);
});
it("suppresses tool result delivery when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
let capturedOnToolResult: ((payload: ReplyPayload) => Promise<void>) | undefined;
const replyResolver = vi.fn(
async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => {
capturedOnToolResult = opts?.onToolResult as
| ((payload: ReplyPayload) => Promise<void>)
| undefined;
return { text: "reply" } satisfies ReplyPayload;
},
);
const ctx = buildTestCtx({ SessionKey: "test:session" });
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
// Trigger a tool result — delivery should be suppressed
expect(capturedOnToolResult).toBeDefined();
await capturedOnToolResult!({ text: "tool output" });
expect(dispatcher.sendToolResult).not.toHaveBeenCalled();
});
it("suppresses block reply delivery when sendPolicy is deny", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
let capturedOnBlockReply:
| ((payload: ReplyPayload, context?: unknown) => Promise<void>)
| undefined;
const replyResolver = vi.fn(
async (_ctx: MsgContext, opts?: GetReplyOptions, _cfg?: OpenClawConfig) => {
capturedOnBlockReply = opts?.onBlockReply as
| ((payload: ReplyPayload, context?: unknown) => Promise<void>)
| undefined;
return [] as ReplyPayload[];
},
);
const ctx = buildTestCtx({ SessionKey: "test:session" });
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
// Trigger a block reply — delivery should be suppressed
expect(capturedOnBlockReply).toBeDefined();
await capturedOnBlockReply!({ text: "streaming chunk" });
expect(dispatcher.sendBlockReply).not.toHaveBeenCalled();
});
it("delivers replies normally when sendPolicy is allow", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "allow",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
const ctx = buildTestCtx({ SessionKey: "test:session" });
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
});
it("delivers replies normally when sendPolicy is unset (defaults to allow)", async () => {
setNoAbort();
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
const ctx = buildTestCtx({ SessionKey: "test:session" });
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
expect(replyResolver).toHaveBeenCalledTimes(1);
expect(dispatcher.sendFinalReply).toHaveBeenCalledTimes(1);
});
it("suppresses the fast-abort reply under sendPolicy deny", async () => {
// Fast-abort runs before sendPolicy in the old code, so the abort reply
// leaked. Under the guard, the abort is still recorded but no reply is
// dispatched. See #53328.
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: true,
aborted: true,
});
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "should not run" }) satisfies ReplyPayload);
const ctx = buildTestCtx({
Provider: "telegram",
Body: "/stop",
SessionKey: "test:session",
});
const result = await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
expect(replyResolver).not.toHaveBeenCalled();
expect(result.queuedFinal).toBe(false);
});
it("delivers the fast-abort reply normally when sendPolicy is allow (regression guard)", async () => {
mocks.tryFastAbortFromMessage.mockResolvedValue({
handled: true,
aborted: true,
});
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "allow",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "hi" }) satisfies ReplyPayload);
const ctx = buildTestCtx({
Provider: "telegram",
Body: "/stop",
SessionKey: "test:session",
});
await dispatchReplyFromConfig({
ctx,
cfg: emptyConfig,
dispatcher,
replyResolver,
});
expect(dispatcher.sendFinalReply).toHaveBeenCalledWith({
text: "⚙️ Agent was aborted.",
});
});
it("skips plugin-bound claim hook under deny and falls through to suppressed agent dispatch", async () => {
// Plugin-bound inbound handlers can emit outbound replies we cannot
// rewind. Under deny, skip the plugin claim entirely and let the agent
// process the message with delivery suppressed. See #53328.
setNoAbort();
hookMocks.runner.hasHooks.mockImplementation(
((hookName?: string) =>
hookName === "inbound_claim" || hookName === "message_received") as () => boolean,
);
hookMocks.registry.plugins = [{ id: "openclaw-codex-app-server", status: "loaded" }];
hookMocks.runner.runInboundClaimForPluginOutcome.mockResolvedValue({
status: "handled",
result: { handled: true },
});
sessionBindingMocks.resolveByConversation.mockReturnValue({
bindingId: "binding-deny",
targetSessionKey: "plugin-binding:codex:abc123",
targetKind: "session",
conversation: {
channel: "discord",
accountId: "default",
conversationId: "channel:deny-test",
},
status: "active",
boundAt: 1710000000000,
metadata: {
pluginBindingOwner: "plugin",
pluginId: "openclaw-codex-app-server",
pluginRoot: "/tmp/plugin",
},
} satisfies SessionBindingRecord);
sessionStoreMocks.currentEntry = {
sessionId: "s1",
updatedAt: 0,
sendPolicy: "deny",
};
const dispatcher = createDispatcher();
const replyResolver = vi.fn(async () => ({ text: "agent reply" }) satisfies ReplyPayload);
const ctx = buildTestCtx({
Provider: "discord",
Surface: "discord",
OriginatingChannel: "discord",
OriginatingTo: "discord:channel:deny-test",
To: "discord:channel:deny-test",
AccountId: "default",
SessionKey: "agent:main:discord:channel:deny-test",
Body: "observed message",
});
await dispatchReplyFromConfig({ ctx, cfg: emptyConfig, dispatcher, replyResolver });
// Binding is still tracked (touch runs before the gate)...
expect(sessionBindingMocks.touch).toHaveBeenCalledWith("binding-deny");
// ...but the plugin claim hook MUST NOT be invoked under deny — the
// plugin can't be trusted to honor suppressDelivery on its outbound path.
expect(hookMocks.runner.runInboundClaimForPluginOutcome).not.toHaveBeenCalled();
// Agent still processes the message (the whole point of the PR)...
expect(replyResolver).toHaveBeenCalledTimes(1);
// ...but no final reply is delivered.
expect(dispatcher.sendFinalReply).not.toHaveBeenCalled();
});
});

View File

@@ -427,6 +427,25 @@ export async function dispatchReplyFromConfig(
? toPluginConversationBinding(pluginOwnedBindingRecord)
: null;
// Resolve sendPolicy early so every outbound path below (plugin-binding
// notices, fast-abort, normal dispatch) honors suppressDelivery. Under
// sendPolicy: "deny" the agent still processes inbound, but no outbound
// reply/notice/indicator is allowed. See #53328.
const sendPolicy = resolveSendPolicy({
cfg,
entry: sessionStoreEntry.entry,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
channel:
sessionStoreEntry.entry?.channel ??
ctx.OriginatingChannel ??
ctx.Surface ??
ctx.Provider ??
undefined,
chatType: sessionStoreEntry.entry?.chatType,
});
const suppressDelivery = sendPolicy === "deny";
const suppressHookUserDelivery = suppressAcpChildUserDelivery || suppressDelivery;
let pluginFallbackReason:
| "plugin-bound-fallback-missing-plugin"
| "plugin-bound-fallback-no-handler"
@@ -434,68 +453,78 @@ export async function dispatchReplyFromConfig(
if (pluginOwnedBinding) {
touchConversationBindingRecord(pluginOwnedBinding.bindingId);
logVerbose(
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
);
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
? await hookRunner.runInboundClaimForPluginOutcome(
pluginOwnedBinding.pluginId,
inboundClaimEvent,
inboundClaimContext,
)
: (() => {
const pluginLoaded =
getGlobalPluginRegistry()?.plugins.some(
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded",
) ?? false;
return pluginLoaded
? ({ status: "no_handler" } as const)
: ({ status: "missing_plugin" } as const);
})();
if (suppressDelivery) {
// Plugin-bound inbound handlers typically emit outbound replies we
// cannot rewind. Under deny, skip the plugin claim entirely and fall
// through to normal (suppressed) agent processing so no delivery leaks
// via the plugin path. See #53328.
logVerbose(
`plugin-bound inbound skipped under sendPolicy: deny (plugin=${pluginOwnedBinding.pluginId} session=${sessionKey ?? "unknown"}); falling through to suppressed agent processing`,
);
} else {
logVerbose(
`plugin-bound inbound routed to ${pluginOwnedBinding.pluginId} conversation=${pluginOwnedBinding.conversationId}`,
);
const targetedClaimOutcome = hookRunner?.runInboundClaimForPluginOutcome
? await hookRunner.runInboundClaimForPluginOutcome(
pluginOwnedBinding.pluginId,
inboundClaimEvent,
inboundClaimContext,
)
: (() => {
const pluginLoaded =
getGlobalPluginRegistry()?.plugins.some(
(plugin) => plugin.id === pluginOwnedBinding.pluginId && plugin.status === "loaded",
) ?? false;
return pluginLoaded
? ({ status: "no_handler" } as const)
: ({ status: "missing_plugin" } as const);
})();
switch (targetedClaimOutcome.status) {
case "handled": {
markIdle("plugin_binding_dispatch");
recordProcessed("completed", { reason: "plugin-bound-handled" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "missing_plugin":
case "no_handler": {
pluginFallbackReason =
targetedClaimOutcome.status === "missing_plugin"
? "plugin-bound-fallback-missing-plugin"
: "plugin-bound-fallback-no-handler";
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
const didSendNotice = await sendBindingNotice(
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
"additive",
);
if (didSendNotice) {
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
}
switch (targetedClaimOutcome.status) {
case "handled": {
markIdle("plugin_binding_dispatch");
recordProcessed("completed", { reason: "plugin-bound-handled" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "missing_plugin":
case "no_handler": {
pluginFallbackReason =
targetedClaimOutcome.status === "missing_plugin"
? "plugin-bound-fallback-missing-plugin"
: "plugin-bound-fallback-no-handler";
if (!hasShownPluginBindingFallbackNotice(pluginOwnedBinding.bindingId)) {
const didSendNotice = await sendBindingNotice(
{ text: buildPluginBindingUnavailableText(pluginOwnedBinding) },
"additive",
);
if (didSendNotice) {
markPluginBindingFallbackNoticeShown(pluginOwnedBinding.bindingId);
}
}
break;
}
case "declined": {
await sendBindingNotice(
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_declined");
recordProcessed("completed", { reason: "plugin-bound-declined" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "error": {
logVerbose(
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
);
await sendBindingNotice(
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_error");
recordProcessed("completed", { reason: "plugin-bound-error" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
break;
}
case "declined": {
await sendBindingNotice(
{ text: buildPluginBindingDeclinedText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_declined");
recordProcessed("completed", { reason: "plugin-bound-declined" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
case "error": {
logVerbose(
`plugin-bound inbound claim failed for ${pluginOwnedBinding.pluginId}: ${targetedClaimOutcome.error}`,
);
await sendBindingNotice(
{ text: buildPluginBindingErrorText(pluginOwnedBinding) },
"terminal",
);
markIdle("plugin_binding_error");
recordProcessed("completed", { reason: "plugin-bound-error" });
return { queuedFinal: false, counts: dispatcher.getQueuedCounts() };
}
}
}
@@ -536,24 +565,30 @@ export async function dispatchReplyFromConfig(
}
const fastAbort = await fastAbortResolver({ ctx, cfg });
if (fastAbort.handled) {
const payload = {
text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents),
} satisfies ReplyPayload;
let queuedFinal = false;
let routedFinalCount = 0;
const result = await routeReplyToOriginating(payload);
if (result) {
queuedFinal = result.ok;
if (result.ok) {
routedFinalCount += 1;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
);
if (!suppressDelivery) {
const payload = {
text: formatAbortReplyTextResolver(fastAbort.stoppedSubagents),
} satisfies ReplyPayload;
const result = await routeReplyToOriginating(payload);
if (result) {
queuedFinal = result.ok;
if (result.ok) {
routedFinalCount += 1;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (abort) failed: ${result.error ?? "unknown error"}`,
);
}
} else {
queuedFinal = dispatcher.sendFinalReply(payload);
}
} else {
queuedFinal = dispatcher.sendFinalReply(payload);
logVerbose(
`dispatch-from-config: fast_abort reply suppressed by sendPolicy: deny (session=${sessionKey ?? "unknown"})`,
);
}
const counts = dispatcher.getQueuedCounts();
counts.final += routedFinalCount;
@@ -562,19 +597,6 @@ export async function dispatchReplyFromConfig(
return { queuedFinal, counts };
}
const sendPolicy = resolveSendPolicy({
cfg,
entry: sessionStoreEntry.entry,
sessionKey: sessionStoreEntry.sessionKey ?? sessionKey,
channel:
sessionStoreEntry.entry?.channel ??
ctx.OriginatingChannel ??
ctx.Surface ??
ctx.Provider ??
undefined,
chatType: sessionStoreEntry.entry?.chatType,
});
const shouldSendToolSummaries = ctx.ChatType !== "group" || ctx.IsForum === true;
const shouldSendToolStartStatuses = ctx.ChatType !== "group" || ctx.IsForum === true;
const sendFinalPayload = async (
@@ -630,7 +652,7 @@ export async function dispatchReplyFromConfig(
const text = beforeDispatchResult.text;
let queuedFinal = false;
let routedFinalCount = 0;
if (text) {
if (text && !suppressDelivery) {
const handledReply = await sendFinalPayload({ text });
queuedFinal = handledReply.queuedFinal;
routedFinalCount += handledReply.routedFinalCount;
@@ -652,7 +674,7 @@ export async function dispatchReplyFromConfig(
inboundAudio,
sessionTtsAuto,
ttsChannel,
suppressUserDelivery: suppressAcpChildUserDelivery,
suppressUserDelivery: suppressHookUserDelivery,
shouldRouteToOriginating,
originatingChannel,
originatingTo,
@@ -676,14 +698,12 @@ export async function dispatchReplyFromConfig(
}
}
if (sendPolicy === "deny") {
// When sendPolicy is "deny", we still let the agent process the inbound message
// (context, memory, tool calls) but suppress all outbound delivery.
if (suppressDelivery) {
logVerbose(
`Send blocked by policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"}`,
`Delivery suppressed by send policy for session ${sessionStoreEntry.sessionKey ?? sessionKey ?? "unknown"} — agent will still process the message`,
);
const counts = dispatcher.getQueuedCounts();
recordProcessed("completed", { reason: "send_policy_deny" });
markIdle("message_completed");
return { queuedFinal: false, counts };
}
const toolStartStatusesSent = new Set<string>();
@@ -710,6 +730,9 @@ export async function dispatchReplyFromConfig(
return parts.join("\n\n").trim() || "Planning next steps.";
};
const maybeSendWorkingStatus = async (label: string): Promise<void> => {
if (suppressDelivery) {
return;
}
const normalizedLabel = normalizeWorkingLabel(label);
if (
!shouldEmitVerboseProgress() ||
@@ -735,7 +758,7 @@ export async function dispatchReplyFromConfig(
explanation?: string;
steps?: string[];
}): Promise<void> => {
if (!shouldEmitVerboseProgress()) {
if (suppressDelivery || !shouldEmitVerboseProgress()) {
return;
}
const replyPayload: ReplyPayload = {
@@ -818,7 +841,8 @@ export async function dispatchReplyFromConfig(
};
const typing = resolveRunTypingPolicy({
requestedPolicy: params.replyOptions?.typingPolicy,
suppressTyping: params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
suppressTyping:
suppressDelivery || params.replyOptions?.suppressTyping === true || shouldSuppressTyping,
originatingChannel,
systemEvent: shouldRouteToOriginating,
});
@@ -833,6 +857,9 @@ export async function dispatchReplyFromConfig(
suppressTyping: typing.suppressTyping,
onToolResult: (payload: ReplyPayload) => {
const run = async () => {
if (suppressDelivery) {
return;
}
const ttsPayload = await maybeApplyTtsToReplyPayload({
payload,
cfg,
@@ -881,6 +908,9 @@ export async function dispatchReplyFromConfig(
},
onBlockReply: (payload: ReplyPayload, context?: BlockReplyContext) => {
const run = async () => {
if (suppressDelivery) {
return;
}
// Suppress reasoning payloads — channels using this generic dispatch
// path (WhatsApp, web, etc.) do not have a dedicated reasoning lane.
// Telegram has its own dispatch path that handles reasoning splitting.
@@ -942,11 +972,12 @@ export async function dispatchReplyFromConfig(
inboundAudio,
sessionTtsAuto,
ttsChannel,
suppressUserDelivery: suppressHookUserDelivery,
shouldRouteToOriginating,
originatingChannel,
originatingTo,
shouldSendToolSummaries,
sendPolicy: "allow",
sendPolicy,
isTailDispatch: true,
},
{
@@ -971,63 +1002,65 @@ export async function dispatchReplyFromConfig(
let queuedFinal = false;
let routedFinalCount = 0;
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
// generic dispatch path do not have a dedicated reasoning lane.
if (reply.isReasoning === true) {
continue;
}
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
}
const ttsMode = resolveConfiguredTtsMode(cfg);
// Generate TTS-only reply after block streaming completes (when there's no final reply).
// This handles the case where block streaming succeeds and drops final payloads,
// but we still want TTS audio to be generated from the accumulated block content.
if (
ttsMode === "final" &&
replies.length === 0 &&
blockCount > 0 &&
accumulatedBlockText.trim()
) {
try {
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockText },
cfg,
channel: ttsChannel,
kind: "final",
inboundAudio,
ttsAuto: sessionTtsAuto,
});
// Only send if TTS was actually applied (mediaUrl exists)
if (ttsSyntheticReply.mediaUrl) {
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
const ttsOnlyPayload: ReplyPayload = {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
};
const result = await routeReplyToOriginating(ttsOnlyPayload);
if (result) {
queuedFinal = result.ok || queuedFinal;
if (result.ok) {
routedFinalCount += 1;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
);
}
} else {
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
queuedFinal = didQueue || queuedFinal;
}
if (!suppressDelivery) {
for (const reply of replies) {
// Suppress reasoning payloads from channel delivery — channels using this
// generic dispatch path do not have a dedicated reasoning lane.
if (reply.isReasoning === true) {
continue;
}
const finalReply = await sendFinalPayload(reply);
queuedFinal = finalReply.queuedFinal || queuedFinal;
routedFinalCount += finalReply.routedFinalCount;
}
const ttsMode = resolveConfiguredTtsMode(cfg);
// Generate TTS-only reply after block streaming completes (when there's no final reply).
// This handles the case where block streaming succeeds and drops final payloads,
// but we still want TTS audio to be generated from the accumulated block content.
if (
ttsMode === "final" &&
replies.length === 0 &&
blockCount > 0 &&
accumulatedBlockText.trim()
) {
try {
const ttsSyntheticReply = await maybeApplyTtsToReplyPayload({
payload: { text: accumulatedBlockText },
cfg,
channel: ttsChannel,
kind: "final",
inboundAudio,
ttsAuto: sessionTtsAuto,
});
// Only send if TTS was actually applied (mediaUrl exists)
if (ttsSyntheticReply.mediaUrl) {
// Send TTS-only payload (no text, just audio) so it doesn't duplicate the block content
const ttsOnlyPayload: ReplyPayload = {
mediaUrl: ttsSyntheticReply.mediaUrl,
audioAsVoice: ttsSyntheticReply.audioAsVoice,
};
const result = await routeReplyToOriginating(ttsOnlyPayload);
if (result) {
queuedFinal = result.ok || queuedFinal;
if (result.ok) {
routedFinalCount += 1;
}
if (!result.ok) {
logVerbose(
`dispatch-from-config: route-reply (tts-only) failed: ${result.error ?? "unknown error"}`,
);
}
} else {
const didQueue = dispatcher.sendFinalReply(ttsOnlyPayload);
queuedFinal = didQueue || queuedFinal;
}
}
} catch (err) {
logVerbose(
`dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`,
);
}
} catch (err) {
logVerbose(
`dispatch-from-config: accumulated block TTS failed: ${formatErrorMessage(err)}`,
);
}
}

View File

@@ -350,7 +350,12 @@ export async function runPreparedReply(
sessionCtx.MediaPath || (sessionCtx.MediaPaths && sessionCtx.MediaPaths.length > 0),
);
if (!baseBodyTrimmed && !hasMediaAttachment) {
await typing.onReplyStart();
// Skip onReplyStart when typing is suppressed (e.g. sendPolicy deny) —
// otherwise channels that wire onReplyStart to typing indicators leak
// visible signals even though outbound delivery is suppressed.
if (!suppressTyping) {
await typing.onReplyStart();
}
logVerbose("Inbound body empty after normalization; skipping agent run");
typing.cleanup();
return {

View File

@@ -117,6 +117,74 @@ describe("tryDispatchAcpReplyHook", () => {
expect(dispatchMock).toHaveBeenCalledOnce();
});
it("dispatches non-tail ACP turn under deny when suppressUserDelivery is set", async () => {
bypassMock.mockResolvedValue(false);
dispatchMock.mockResolvedValue({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
const result = await tryDispatchAcpReplyHook(
{
...event,
sendPolicy: "deny",
suppressUserDelivery: true,
ctx: buildTestCtx({
SessionKey: "agent:test:session",
BodyForCommands: "write a test",
BodyForAgent: "write a test",
}),
},
ctx,
);
// Non-tail, non-command ACP turns under deny must still flow through ACP
// runtime so session/tool state stays consistent — delivery suppression is
// handled inside the ACP delivery path via suppressUserDelivery.
expect(dispatchMock).toHaveBeenCalledOnce();
expect(dispatchMock).toHaveBeenCalledWith(
expect.objectContaining({
suppressUserDelivery: true,
bypassForCommand: false,
}),
);
expect(result).toEqual({
handled: true,
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
});
it("allows tail dispatch through when sendPolicy is deny", async () => {
bypassMock.mockResolvedValue(false);
dispatchMock.mockResolvedValue({
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
const result = await tryDispatchAcpReplyHook(
{
...event,
sendPolicy: "deny",
isTailDispatch: true,
ctx: buildTestCtx({
SessionKey: "agent:test:session",
BodyForCommands: "continue after reset",
BodyForAgent: "continue after reset",
}),
},
ctx,
);
// Tail dispatch should proceed despite deny — delivery suppression is handled downstream
expect(dispatchMock).toHaveBeenCalledOnce();
expect(result).toEqual({
handled: true,
queuedFinal: false,
counts: { tool: 0, block: 0, final: 0 },
});
});
it("does not let ACP claim reset commands before local command handling", async () => {
bypassMock.mockResolvedValue(true);
dispatchMock.mockResolvedValue(undefined);

View File

@@ -60,13 +60,31 @@ export async function tryDispatchAcpReplyHook(
event: PluginHookReplyDispatchEvent,
ctx: PluginHookReplyDispatchContext,
): Promise<PluginHookReplyDispatchResult | void> {
if (event.sendPolicy === "deny" && !hasExplicitCommandCandidate(event.ctx)) {
// Under sendPolicy: "deny", ACP-bound sessions still need their turns to flow
// through acpManager.runTurn so session state, tool calls, and memory stay
// consistent — only outbound delivery should be suppressed. The ACP delivery
// path (dispatch-acp-delivery.ts) honors event.suppressUserDelivery to drop
// user-facing sends. If suppressUserDelivery is not set under deny, we cannot
// safely route through ACP (delivery would leak), so fall back to the
// embedded reply path unless an explicit command candidate or tail dispatch
// warrants going through ACP anyway.
if (
event.sendPolicy === "deny" &&
!event.suppressUserDelivery &&
!hasExplicitCommandCandidate(event.ctx) &&
!event.isTailDispatch
) {
return;
}
const runtime = await loadDispatchAcpRuntime();
const bypassForCommand = await runtime.shouldBypassAcpDispatchForCommand(event.ctx, ctx.cfg);
if (event.sendPolicy === "deny" && !bypassForCommand) {
if (
event.sendPolicy === "deny" &&
!event.suppressUserDelivery &&
!bypassForCommand &&
!event.isTailDispatch
) {
return;
}