refactor: extract embedded runner failover helpers

This commit is contained in:
Peter Steinberger
2026-04-03 19:26:24 +09:00
parent 71f8c0344a
commit b406b7d2e4
12 changed files with 825 additions and 261 deletions

View File

@@ -6,6 +6,7 @@ import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from "vite
import type { OpenClawConfig } from "../config/config.js";
import { redactIdentifier } from "../logging/redact-identifier.js";
import type { AuthProfileFailureReason } from "./auth-profiles.js";
import { buildAttemptReplayMetadata } from "./pi-embedded-runner/run/incomplete-turn.js";
import type { EmbeddedRunAttemptResult } from "./pi-embedded-runner/run/types.js";
const runEmbeddedAttemptMock = vi.fn<(params: unknown) => Promise<EmbeddedRunAttemptResult>>();
@@ -174,24 +175,36 @@ const buildAssistant = (overrides: Partial<AssistantMessage>): AssistantMessage
...overrides,
});
const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunAttemptResult => ({
aborted: false,
timedOut: false,
timedOutDuringCompaction: false,
promptError: null,
sessionIdUsed: "session:test",
systemPromptReport: undefined,
messagesSnapshot: [],
assistantTexts: [],
toolMetas: [],
lastAssistant: undefined,
didSendViaMessagingTool: false,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
cloudCodeAssistFormatError: false,
...overrides,
});
const makeAttempt = (overrides: Partial<EmbeddedRunAttemptResult>): EmbeddedRunAttemptResult => {
const toolMetas = overrides.toolMetas ?? [];
const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false;
const successfulCronAdds = overrides.successfulCronAdds;
return {
aborted: false,
timedOut: false,
timedOutDuringCompaction: false,
promptError: null,
sessionIdUsed: "session:test",
systemPromptReport: undefined,
messagesSnapshot: [],
assistantTexts: [],
toolMetas,
lastAssistant: undefined,
replayMetadata:
overrides.replayMetadata ??
buildAttemptReplayMetadata({
toolMetas,
didSendViaMessagingTool,
successfulCronAdds,
}),
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],
cloudCodeAssistFormatError: false,
...overrides,
};
};
const makeConfig = (opts?: {
fallbacks?: string[];

View File

@@ -1,3 +1,4 @@
import { buildAttemptReplayMetadata } from "./run/incomplete-turn.js";
import type { EmbeddedRunAttemptResult } from "./run/types.js";
export const DEFAULT_OVERFLOW_ERROR_MESSAGE =
@@ -28,6 +29,9 @@ export function makeCompactionSuccess(params: {
export function makeAttemptResult(
overrides: Partial<EmbeddedRunAttemptResult> = {},
): EmbeddedRunAttemptResult {
const toolMetas = overrides.toolMetas ?? [];
const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false;
const successfulCronAdds = overrides.successfulCronAdds;
return {
aborted: false,
timedOut: false,
@@ -35,10 +39,17 @@ export function makeAttemptResult(
promptError: null,
sessionIdUsed: "test-session",
assistantTexts: ["Hello!"],
toolMetas: [],
toolMetas,
lastAssistant: undefined,
messagesSnapshot: [],
didSendViaMessagingTool: false,
replayMetadata:
overrides.replayMetadata ??
buildAttemptReplayMetadata({
toolMetas,
didSendViaMessagingTool,
successfulCronAdds,
}),
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],

View File

@@ -45,7 +45,6 @@ import {
extractObservedOverflowTokenCount,
type FailoverReason,
formatAssistantErrorText,
formatBillingErrorMessage,
isAuthAssistantError,
isBillingAssistantError,
isCompactionFailureError,
@@ -53,13 +52,11 @@ import {
isFailoverErrorMessage,
isLikelyContextOverflowError,
isRateLimitAssistantError,
isTimeoutErrorMessage,
parseImageDimensionError,
parseImageSizeError,
pickFallbackThinkingLevel,
} from "../pi-embedded-helpers.js";
import { ensureRuntimePluginsLoaded } from "../runtime-plugins.js";
import { isLikelyMutatingToolName } from "../tool-mutation.js";
import { derivePromptTokens, normalizeUsage, type UsageLike } from "../usage.js";
import { redactRunIdentifier, resolveRunWorkspaceDir } from "../workspace-run.js";
import { runPostCompactionSideEffects } from "./compact.js";
@@ -68,9 +65,11 @@ import { runContextEngineMaintenance } from "./context-engine-maintenance.js";
import { resolveGlobalLane, resolveSessionLane } from "./lanes.js";
import { log } from "./logger.js";
import { resolveModelAsync } from "./model.js";
import { handleAssistantFailover } from "./run/assistant-failover.js";
import { runEmbeddedAttempt } from "./run/attempt.js";
import { createEmbeddedRunAuthController } from "./run/auth-controller.js";
import { createFailoverDecisionLogger } from "./run/failover-observation.js";
import { mergeRetryFailoverReason, resolveRunFailoverDecision } from "./run/failover-policy.js";
import {
buildErrorAgentMeta,
buildUsageAgentMetaFields,
@@ -83,8 +82,10 @@ import {
type RuntimeAuthState,
scrubAnthropicRefusalMagic,
} from "./run/helpers.js";
import { resolveIncompleteTurnPayloadText } from "./run/incomplete-turn.js";
import type { RunEmbeddedPiAgentParams } from "./run/params.js";
import { buildEmbeddedRunPayloads } from "./run/payloads.js";
import { handleRetryLimitExhaustion } from "./run/retry-limit.js";
import { resolveEffectiveRuntimeModel, resolveHookModelSelection } from "./run/setup.js";
import {
sessionLikelyHasOversizedToolResults,
@@ -449,44 +450,27 @@ export async function runEmbeddedPiAgent(
`provider=${provider}/${modelId} attempts=${runLoopIterations} ` +
`maxAttempts=${MAX_RUN_LOOP_ITERATIONS}`,
);
if (
fallbackConfigured &&
lastRetryFailoverReason &&
lastRetryFailoverReason !== "timeout" &&
lastRetryFailoverReason !== "model_not_found" &&
lastRetryFailoverReason !== "format" &&
lastRetryFailoverReason !== "session_expired"
) {
throw new FailoverError(message, {
reason: lastRetryFailoverReason,
const retryLimitDecision = resolveRunFailoverDecision({
stage: "retry_limit",
fallbackConfigured,
failoverReason: lastRetryFailoverReason,
});
return handleRetryLimitExhaustion({
message,
decision: retryLimitDecision,
provider,
model: modelId,
profileId: lastProfileId,
durationMs: Date.now() - started,
agentMeta: buildErrorAgentMeta({
sessionId: params.sessionId,
provider,
model: modelId,
profileId: lastProfileId,
status: resolveFailoverStatus(lastRetryFailoverReason),
});
}
return {
payloads: [
{
text:
"Request failed after repeated internal retries. " +
"Please try again, or use /new to start a fresh session.",
isError: true,
},
],
meta: {
durationMs: Date.now() - started,
agentMeta: buildErrorAgentMeta({
sessionId: params.sessionId,
provider,
model: model.id,
usageAccumulator,
lastRunPromptUsage,
lastTurnTotal,
}),
error: { kind: "retry_limit", message },
},
};
model: model.id,
usageAccumulator,
lastRunPromptUsage,
lastTurnTotal,
}),
});
}
runLoopIterations += 1;
const runtimeAuthRetry = authRetryPending;
@@ -1083,16 +1067,36 @@ export async function runEmbeddedPiAgent(
logFallbackDecision: logPromptFailoverDecision,
});
}
let promptFailoverDecision = resolveRunFailoverDecision({
stage: "prompt",
aborted,
fallbackConfigured,
failoverFailure: promptFailoverFailure,
failoverReason: promptFailoverReason,
profileRotated: false,
});
if (
promptFailoverFailure &&
promptFailoverReason !== "timeout" &&
promptFailoverDecision.action === "rotate_profile" &&
(await advanceAuthProfile())
) {
lastRetryFailoverReason = promptFailoverReason ?? lastRetryFailoverReason;
lastRetryFailoverReason = mergeRetryFailoverReason({
previous: lastRetryFailoverReason,
failoverReason: promptFailoverReason,
});
logPromptFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
continue;
}
if (promptFailoverDecision.action === "rotate_profile") {
promptFailoverDecision = resolveRunFailoverDecision({
stage: "prompt",
aborted,
fallbackConfigured,
failoverFailure: promptFailoverFailure,
failoverReason: promptFailoverReason,
profileRotated: true,
});
}
const fallbackThinking = pickFallbackThinkingLevel({
message: errorText,
attempted: attemptedThinking,
@@ -1107,22 +1111,23 @@ export async function runEmbeddedPiAgent(
// Throw FailoverError for prompt-side failover reasons when fallbacks
// are configured so outer model fallback can continue on overload,
// rate-limit, auth, or billing failures.
if (fallbackConfigured && promptFailoverFailure) {
const status = resolveFailoverStatus(promptFailoverReason ?? "unknown");
if (promptFailoverDecision.action === "fallback_model") {
const fallbackReason = promptFailoverDecision.reason ?? "unknown";
const status = resolveFailoverStatus(fallbackReason);
logPromptFailoverDecision("fallback_model", { status });
await maybeBackoffBeforeOverloadFailover(promptFailoverReason);
throw (
normalizedPromptFailover ??
new FailoverError(errorText, {
reason: promptFailoverReason ?? "unknown",
reason: fallbackReason,
provider,
model: modelId,
profileId: lastProfileId,
status: resolveFailoverStatus(promptFailoverReason ?? "unknown"),
status,
})
);
}
if (promptFailoverFailure || promptFailoverReason) {
if (promptFailoverDecision.action === "surface_error") {
logPromptFailoverDecision("surface_error");
}
throw promptError;
@@ -1194,120 +1199,54 @@ export async function runEmbeddedPiAgent(
);
}
// Rotate on timeout to try another account/model path in this turn,
// but exclude post-prompt compaction timeouts (model succeeded; no profile issue).
const shouldRotate =
(!aborted && (failoverFailure || assistantFailoverReason !== null)) ||
(timedOut && !timedOutDuringCompaction);
if (shouldRotate) {
if (lastProfileId) {
const reason = timedOut ? "timeout" : assistantProfileFailureReason;
// Skip cooldown for timeouts: a timeout is model/network-specific,
// not an auth issue. Marking the profile would poison fallback models
// on the same provider (e.g. gpt-5.3 timeout blocks gpt-5.2).
await maybeMarkAuthProfileFailure({
profileId: lastProfileId,
reason,
modelId,
});
if (timedOut && !isProbeSession) {
log.warn(`Profile ${lastProfileId} timed out. Trying next account...`);
}
if (cloudCodeAssistFormatError) {
log.warn(
`Profile ${lastProfileId} hit Cloud Code Assist format error. Tool calls will be sanitized on retry.`,
);
}
}
// For overloaded errors, check the configured rotation cap *before*
// calling advanceAuthProfile() to avoid a wasted auth-profile setup
// cycle. advanceAuthProfile() runs applyApiKeyInfo() which
// initializes the next profile — costly work that is pointless when
// we already know we will escalate to cross-provider fallback.
// See: https://github.com/openclaw/openclaw/issues/58348
if (assistantFailoverReason === "overloaded") {
overloadProfileRotations += 1;
if (overloadProfileRotations > overloadProfileRotationLimit && fallbackConfigured) {
const status = resolveFailoverStatus("overloaded");
log.warn(
`overload profile rotation cap reached for ${sanitizeForLog(provider)}/${sanitizeForLog(modelId)} after ${overloadProfileRotations} rotations; escalating to model fallback`,
);
logAssistantFailoverDecision("fallback_model", { status });
throw new FailoverError(
"The AI service is temporarily overloaded. Please try again in a moment.",
{
reason: "overloaded",
provider: activeErrorContext.provider,
model: activeErrorContext.model,
profileId: lastProfileId,
status,
},
);
}
}
// For rate-limit errors, apply the same rotation cap so that
// per-model quota exhaustion (e.g. Anthropic Sonnet-only limits)
// escalates to cross-provider model fallback instead of spinning
// forever across profiles that share the same model quota.
// See: https://github.com/openclaw/openclaw/issues/58572
if (assistantFailoverReason === "rate_limit") {
maybeEscalateRateLimitProfileFallback({
failoverProvider: activeErrorContext.provider,
failoverModel: activeErrorContext.model,
logFallbackDecision: logAssistantFailoverDecision,
});
}
const rotated = await advanceAuthProfile();
if (rotated) {
lastRetryFailoverReason =
assistantFailoverReason ?? (timedOut ? "timeout" : null) ?? lastRetryFailoverReason;
logAssistantFailoverDecision("rotate_profile");
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
continue;
}
if (fallbackConfigured) {
await maybeBackoffBeforeOverloadFailover(assistantFailoverReason);
// Prefer formatted error message (user-friendly) over raw errorMessage
const message =
(lastAssistant
? formatAssistantErrorText(lastAssistant, {
cfg: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
provider: activeErrorContext.provider,
model: activeErrorContext.model,
})
: undefined) ||
lastAssistant?.errorMessage?.trim() ||
(timedOut
? "LLM request timed out."
: rateLimitFailure
? "LLM request rate limited."
: billingFailure
? formatBillingErrorMessage(
activeErrorContext.provider,
activeErrorContext.model,
)
: authFailure
? "LLM request unauthorized."
: "LLM request failed.");
const status =
resolveFailoverStatus(assistantFailoverReason ?? "unknown") ??
(isTimeoutErrorMessage(message) ? 408 : undefined);
logAssistantFailoverDecision("fallback_model", { status });
throw new FailoverError(message, {
reason: assistantFailoverReason ?? "unknown",
provider: activeErrorContext.provider,
model: activeErrorContext.model,
profileId: lastProfileId,
status,
});
}
logAssistantFailoverDecision("surface_error");
const assistantFailoverDecision = resolveRunFailoverDecision({
stage: "assistant",
aborted,
fallbackConfigured,
failoverFailure,
failoverReason: assistantFailoverReason,
timedOut,
timedOutDuringCompaction,
profileRotated: false,
});
const assistantFailoverOutcome = await handleAssistantFailover({
initialDecision: assistantFailoverDecision,
aborted,
fallbackConfigured,
failoverFailure,
failoverReason: assistantFailoverReason,
timedOut,
timedOutDuringCompaction,
assistantProfileFailureReason,
lastProfileId,
modelId,
provider,
activeErrorContext,
lastAssistant,
config: params.config,
sessionKey: params.sessionKey ?? params.sessionId,
authFailure,
rateLimitFailure,
billingFailure,
cloudCodeAssistFormatError,
isProbeSession,
overloadProfileRotations,
overloadProfileRotationLimit,
previousRetryFailoverReason: lastRetryFailoverReason,
logAssistantFailoverDecision,
warn: (message) => log.warn(message),
maybeMarkAuthProfileFailure,
maybeEscalateRateLimitProfileFallback,
maybeBackoffBeforeOverloadFailover,
advanceAuthProfile,
});
overloadProfileRotations = assistantFailoverOutcome.overloadProfileRotations;
if (assistantFailoverOutcome.action === "retry") {
lastRetryFailoverReason = assistantFailoverOutcome.lastRetryFailoverReason;
continue;
}
if (assistantFailoverOutcome.action === "throw") {
throw assistantFailoverOutcome.error;
}
const usageMeta = buildUsageAgentMetaFields({
@@ -1373,83 +1312,50 @@ export async function runEmbeddedPiAgent(
};
}
// Detect incomplete turns where prompt() resolved prematurely due to
// pi-agent-core's auto-retry timing issue: when a mid-turn 429/overload
// triggers an internal retry, waitForRetry() resolves on the next
// assistant message *before* tool execution completes in the retried
// loop (see #8643). The captured lastAssistant has a non-terminal
// stopReason (e.g. "toolUse") with no text content, producing empty
// payloads. Surface an error instead of silently dropping the reply.
//
// Exclusions:
// - didSendDeterministicApprovalPrompt: approval-prompt turns
// intentionally produce empty payloads with stopReason=toolUse
// - lastToolError: suppressed/recoverable tool failures also produce
// empty payloads with stopReason=toolUse; those are handled by
// buildEmbeddedRunPayloads' own warning policy
if (
payloads.length === 0 &&
!aborted &&
!timedOut &&
!attempt.clientToolCall &&
!attempt.yieldDetected &&
!attempt.didSendDeterministicApprovalPrompt &&
!attempt.lastToolError
) {
const incompleteStopReason = lastAssistant?.stopReason;
// Only trigger for non-terminal stop reasons (toolUse, etc.) to
// avoid false positives when the model legitimately produces no text.
// StopReason union: "aborted" | "error" | "length" | "toolUse"
// "toolUse" is the key signal that prompt() resolved mid-turn.
if (incompleteStopReason === "toolUse" || incompleteStopReason === "error") {
log.warn(
`incomplete turn detected: runId=${params.runId} sessionId=${params.sessionId} ` +
`stopReason=${incompleteStopReason} payloads=0 — surfacing error to user`,
);
// Detect incomplete turns where prompt() resolved prematurely and the
// runner would otherwise drop an empty reply.
const incompleteTurnText = resolveIncompleteTurnPayloadText({
payloadCount: payloads.length,
aborted,
timedOut,
attempt,
});
if (incompleteTurnText) {
const incompleteStopReason = attempt.lastAssistant?.stopReason;
log.warn(
`incomplete turn detected: runId=${params.runId} sessionId=${params.sessionId} ` +
`stopReason=${incompleteStopReason} payloads=0 — surfacing error to user`,
);
// Mark the failing profile for cooldown so multi-profile setups
// rotate away from the exhausted credential on the next turn.
if (lastProfileId) {
await maybeMarkAuthProfileFailure({
profileId: lastProfileId,
reason: resolveAuthProfileFailureReason(assistantFailoverReason),
});
}
// Warn about potential side-effects when the interrupted turn may
// already have mutated state or sent outbound actions.
const hadMutatingTools = attempt.toolMetas.some((t) =>
isLikelyMutatingToolName(t.toolName),
);
const hadPotentialSideEffects =
hadMutatingTools ||
attempt.didSendViaMessagingTool ||
(attempt.successfulCronAdds ?? 0) > 0;
const errorText = hadPotentialSideEffects
? "⚠️ Agent couldn't generate a response. Note: some tool actions may have already been executed — please verify before retrying."
: "⚠️ Agent couldn't generate a response. Please try again.";
return {
payloads: [
{
text: errorText,
isError: true,
},
],
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
systemPromptReport: attempt.systemPromptReport,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
messagingToolSentTargets: attempt.messagingToolSentTargets,
successfulCronAdds: attempt.successfulCronAdds,
};
// Mark the failing profile for cooldown so multi-profile setups
// rotate away from the exhausted credential on the next turn.
if (lastProfileId) {
await maybeMarkAuthProfileFailure({
profileId: lastProfileId,
reason: resolveAuthProfileFailureReason(assistantFailoverReason),
});
}
return {
payloads: [
{
text: incompleteTurnText,
isError: true,
},
],
meta: {
durationMs: Date.now() - started,
agentMeta,
aborted,
systemPromptReport: attempt.systemPromptReport,
},
didSendViaMessagingTool: attempt.didSendViaMessagingTool,
didSendDeterministicApprovalPrompt: attempt.didSendDeterministicApprovalPrompt,
messagingToolSentTexts: attempt.messagingToolSentTexts,
messagingToolSentMediaUrls: attempt.messagingToolSentMediaUrls,
messagingToolSentTargets: attempt.messagingToolSentTargets,
successfulCronAdds: attempt.successfulCronAdds,
};
}
log.debug(

View File

@@ -0,0 +1,208 @@
import type { AssistantMessage } from "@mariozechner/pi-ai";
import type { OpenClawConfig } from "../../../config/config.js";
import { sanitizeForLog } from "../../../terminal/ansi.js";
import type { AuthProfileFailureReason } from "../../auth-profiles.js";
import { FailoverError, resolveFailoverStatus } from "../../failover-error.js";
import {
formatAssistantErrorText,
formatBillingErrorMessage,
isTimeoutErrorMessage,
type FailoverReason,
} from "../../pi-embedded-helpers.js";
import {
mergeRetryFailoverReason,
resolveRunFailoverDecision,
type AssistantFailoverDecision,
} from "./failover-policy.js";
type AssistantFailoverOutcome =
| {
action: "continue_normal";
overloadProfileRotations: number;
}
| {
action: "retry";
overloadProfileRotations: number;
lastRetryFailoverReason: FailoverReason | null;
}
| {
action: "throw";
overloadProfileRotations: number;
error: FailoverError;
};
export async function handleAssistantFailover(params: {
initialDecision: AssistantFailoverDecision;
aborted: boolean;
fallbackConfigured: boolean;
failoverFailure: boolean;
failoverReason: FailoverReason | null;
timedOut: boolean;
timedOutDuringCompaction: boolean;
assistantProfileFailureReason: AuthProfileFailureReason | null;
lastProfileId?: string;
modelId: string;
provider: string;
activeErrorContext: { provider: string; model: string };
lastAssistant: AssistantMessage | undefined;
config: OpenClawConfig | undefined;
sessionKey?: string;
authFailure: boolean;
rateLimitFailure: boolean;
billingFailure: boolean;
cloudCodeAssistFormatError: boolean;
isProbeSession: boolean;
overloadProfileRotations: number;
overloadProfileRotationLimit: number;
previousRetryFailoverReason: FailoverReason | null;
logAssistantFailoverDecision: (
decision: "rotate_profile" | "fallback_model" | "surface_error",
extra?: { status?: number },
) => void;
warn: (message: string) => void;
maybeMarkAuthProfileFailure: (failure: {
profileId?: string;
reason?: AuthProfileFailureReason | null;
modelId?: string;
}) => Promise<void>;
maybeEscalateRateLimitProfileFallback: (params: {
failoverProvider: string;
failoverModel: string;
logFallbackDecision: (decision: "fallback_model", extra?: { status?: number }) => void;
}) => void;
maybeBackoffBeforeOverloadFailover: (reason: FailoverReason | null) => Promise<void>;
advanceAuthProfile: () => Promise<boolean>;
}): Promise<AssistantFailoverOutcome> {
let overloadProfileRotations = params.overloadProfileRotations;
let decision = params.initialDecision;
if (decision.action === "rotate_profile") {
if (params.lastProfileId) {
const reason = params.timedOut ? "timeout" : params.assistantProfileFailureReason;
await params.maybeMarkAuthProfileFailure({
profileId: params.lastProfileId,
reason,
modelId: params.modelId,
});
if (params.timedOut && !params.isProbeSession) {
params.warn(`Profile ${params.lastProfileId} timed out. Trying next account...`);
}
if (params.cloudCodeAssistFormatError) {
params.warn(
`Profile ${params.lastProfileId} hit Cloud Code Assist format error. Tool calls will be sanitized on retry.`,
);
}
}
if (params.failoverReason === "overloaded") {
overloadProfileRotations += 1;
if (
overloadProfileRotations > params.overloadProfileRotationLimit &&
params.fallbackConfigured
) {
const status = resolveFailoverStatus("overloaded");
params.warn(
`overload profile rotation cap reached for ${sanitizeForLog(params.provider)}/${sanitizeForLog(params.modelId)} after ${overloadProfileRotations} rotations; escalating to model fallback`,
);
params.logAssistantFailoverDecision("fallback_model", { status });
return {
action: "throw",
overloadProfileRotations,
error: new FailoverError(
"The AI service is temporarily overloaded. Please try again in a moment.",
{
reason: "overloaded",
provider: params.activeErrorContext.provider,
model: params.activeErrorContext.model,
profileId: params.lastProfileId,
status,
},
),
};
}
}
if (params.failoverReason === "rate_limit") {
params.maybeEscalateRateLimitProfileFallback({
failoverProvider: params.activeErrorContext.provider,
failoverModel: params.activeErrorContext.model,
logFallbackDecision: params.logAssistantFailoverDecision,
});
}
const rotated = await params.advanceAuthProfile();
if (rotated) {
params.logAssistantFailoverDecision("rotate_profile");
await params.maybeBackoffBeforeOverloadFailover(params.failoverReason);
return {
action: "retry",
overloadProfileRotations,
lastRetryFailoverReason: mergeRetryFailoverReason({
previous: params.previousRetryFailoverReason,
failoverReason: params.failoverReason,
timedOut: params.timedOut,
}),
};
}
decision = resolveRunFailoverDecision({
stage: "assistant",
aborted: params.aborted,
fallbackConfigured: params.fallbackConfigured,
failoverFailure: params.failoverFailure,
failoverReason: params.failoverReason,
timedOut: params.timedOut,
timedOutDuringCompaction: params.timedOutDuringCompaction,
profileRotated: true,
});
}
if (decision.action === "fallback_model") {
await params.maybeBackoffBeforeOverloadFailover(params.failoverReason);
const message =
(params.lastAssistant
? formatAssistantErrorText(params.lastAssistant, {
cfg: params.config,
sessionKey: params.sessionKey,
provider: params.activeErrorContext.provider,
model: params.activeErrorContext.model,
})
: undefined) ||
params.lastAssistant?.errorMessage?.trim() ||
(params.timedOut
? "LLM request timed out."
: params.rateLimitFailure
? "LLM request rate limited."
: params.billingFailure
? formatBillingErrorMessage(
params.activeErrorContext.provider,
params.activeErrorContext.model,
)
: params.authFailure
? "LLM request unauthorized."
: "LLM request failed.");
const status =
resolveFailoverStatus(decision.reason) ?? (isTimeoutErrorMessage(message) ? 408 : undefined);
params.logAssistantFailoverDecision("fallback_model", { status });
return {
action: "throw",
overloadProfileRotations,
error: new FailoverError(message, {
reason: decision.reason,
provider: params.activeErrorContext.provider,
model: params.activeErrorContext.model,
profileId: params.lastProfileId,
status,
}),
};
}
if (decision.action === "surface_error") {
params.logAssistantFailoverDecision("surface_error");
}
return {
action: "continue_normal",
overloadProfileRotations,
};
}

View File

@@ -180,6 +180,7 @@ import {
} from "./compaction-timeout.js";
import { pruneProcessedHistoryImages } from "./history-image-prune.js";
import { detectAndLoadPromptImages } from "./images.js";
import { buildAttemptReplayMetadata } from "./incomplete-turn.js";
import { resolveLlmIdleTimeoutMs, streamWithIdleTimeout } from "./llm-idle-timeout.js";
import type { EmbeddedRunAttemptParams, EmbeddedRunAttemptResult } from "./types.js";
@@ -1934,6 +1935,11 @@ export async function runEmbeddedAttempt(
}
return {
replayMetadata: buildAttemptReplayMetadata({
toolMetas: toolMetasNormalized,
didSendViaMessagingTool: didSendViaMessagingTool(),
successfulCronAdds: getSuccessfulCronAdds(),
}),
aborted,
timedOut,
timedOutDuringCompaction,

View File

@@ -0,0 +1,135 @@
import { describe, expect, it } from "vitest";
import { mergeRetryFailoverReason, resolveRunFailoverDecision } from "./failover-policy.js";
describe("resolveRunFailoverDecision", () => {
it("escalates retry-limit exhaustion for replay-safe failover reasons", () => {
expect(
resolveRunFailoverDecision({
stage: "retry_limit",
fallbackConfigured: true,
failoverReason: "rate_limit",
}),
).toEqual({
action: "fallback_model",
reason: "rate_limit",
});
});
it("keeps retry-limit as a local error for non-escalating reasons", () => {
expect(
resolveRunFailoverDecision({
stage: "retry_limit",
fallbackConfigured: true,
failoverReason: "timeout",
}),
).toEqual({
action: "return_error_payload",
});
});
it("prefers prompt-side profile rotation before fallback", () => {
expect(
resolveRunFailoverDecision({
stage: "prompt",
aborted: false,
fallbackConfigured: true,
failoverFailure: true,
failoverReason: "rate_limit",
profileRotated: false,
}),
).toEqual({
action: "rotate_profile",
reason: "rate_limit",
});
});
it("falls back after prompt rotation is exhausted", () => {
expect(
resolveRunFailoverDecision({
stage: "prompt",
aborted: false,
fallbackConfigured: true,
failoverFailure: true,
failoverReason: "rate_limit",
profileRotated: true,
}),
).toEqual({
action: "fallback_model",
reason: "rate_limit",
});
});
it("treats classified assistant-side 429s as rotation candidates even without error stopReason", () => {
expect(
resolveRunFailoverDecision({
stage: "assistant",
aborted: false,
fallbackConfigured: true,
failoverFailure: false,
failoverReason: "rate_limit",
timedOut: false,
timedOutDuringCompaction: false,
profileRotated: false,
}),
).toEqual({
action: "rotate_profile",
reason: "rate_limit",
});
});
it("falls back after assistant rotation is exhausted", () => {
expect(
resolveRunFailoverDecision({
stage: "assistant",
aborted: false,
fallbackConfigured: true,
failoverFailure: false,
failoverReason: "rate_limit",
timedOut: false,
timedOutDuringCompaction: false,
profileRotated: true,
}),
).toEqual({
action: "fallback_model",
reason: "rate_limit",
});
});
it("does nothing for assistant turns without failover signals", () => {
expect(
resolveRunFailoverDecision({
stage: "assistant",
aborted: false,
fallbackConfigured: true,
failoverFailure: false,
failoverReason: null,
timedOut: false,
timedOutDuringCompaction: false,
profileRotated: false,
}),
).toEqual({
action: "continue_normal",
});
});
});
describe("mergeRetryFailoverReason", () => {
it("preserves the previous classified reason when the current one is null", () => {
expect(
mergeRetryFailoverReason({
previous: "rate_limit",
failoverReason: null,
}),
).toBe("rate_limit");
});
it("records timeout when no classified reason is present", () => {
expect(
mergeRetryFailoverReason({
previous: null,
failoverReason: null,
timedOut: true,
}),
).toBe("timeout");
});
});

View File

@@ -0,0 +1,163 @@
import type { FailoverReason } from "../../pi-embedded-helpers.js";
export type RunFailoverDecisionAction =
| "continue_normal"
| "rotate_profile"
| "fallback_model"
| "surface_error"
| "return_error_payload";
export type RunFailoverDecision =
| {
action: "continue_normal";
}
| {
action: "rotate_profile" | "surface_error";
reason: FailoverReason | null;
}
| {
action: "fallback_model";
reason: FailoverReason;
}
| {
action: "return_error_payload";
};
export type RetryLimitFailoverDecision = Extract<
RunFailoverDecision,
{ action: "fallback_model" | "return_error_payload" }
>;
export type PromptFailoverDecision = Extract<
RunFailoverDecision,
{ action: "rotate_profile" | "fallback_model" | "surface_error" }
>;
export type AssistantFailoverDecision = Extract<
RunFailoverDecision,
{ action: "continue_normal" | "rotate_profile" | "fallback_model" | "surface_error" }
>;
type RetryLimitDecisionParams = {
stage: "retry_limit";
fallbackConfigured: boolean;
failoverReason: FailoverReason | null;
};
type PromptDecisionParams = {
stage: "prompt";
aborted: boolean;
fallbackConfigured: boolean;
failoverFailure: boolean;
failoverReason: FailoverReason | null;
profileRotated: boolean;
};
type AssistantDecisionParams = {
stage: "assistant";
aborted: boolean;
fallbackConfigured: boolean;
failoverFailure: boolean;
failoverReason: FailoverReason | null;
timedOut: boolean;
timedOutDuringCompaction: boolean;
profileRotated: boolean;
};
export type RunFailoverDecisionParams =
| RetryLimitDecisionParams
| PromptDecisionParams
| AssistantDecisionParams;
function shouldEscalateRetryLimit(reason: FailoverReason | null): boolean {
return Boolean(
reason &&
reason !== "timeout" &&
reason !== "model_not_found" &&
reason !== "format" &&
reason !== "session_expired",
);
}
function shouldRotatePrompt(params: PromptDecisionParams): boolean {
return params.failoverFailure && params.failoverReason !== "timeout";
}
function shouldRotateAssistant(params: AssistantDecisionParams): boolean {
return (
(!params.aborted && (params.failoverFailure || params.failoverReason !== null)) ||
(params.timedOut && !params.timedOutDuringCompaction)
);
}
export function mergeRetryFailoverReason(params: {
previous: FailoverReason | null;
failoverReason: FailoverReason | null;
timedOut?: boolean;
}): FailoverReason | null {
return params.failoverReason ?? (params.timedOut ? "timeout" : null) ?? params.previous;
}
export function resolveRunFailoverDecision(
params: RetryLimitDecisionParams,
): RetryLimitFailoverDecision;
export function resolveRunFailoverDecision(params: PromptDecisionParams): PromptFailoverDecision;
export function resolveRunFailoverDecision(
params: AssistantDecisionParams,
): AssistantFailoverDecision;
export function resolveRunFailoverDecision(params: RunFailoverDecisionParams): RunFailoverDecision {
if (params.stage === "retry_limit") {
if (params.fallbackConfigured && shouldEscalateRetryLimit(params.failoverReason)) {
const fallbackReason = params.failoverReason ?? "unknown";
return {
action: "fallback_model",
reason: fallbackReason,
};
}
return {
action: "return_error_payload",
};
}
if (params.stage === "prompt") {
if (!params.profileRotated && shouldRotatePrompt(params)) {
return {
action: "rotate_profile",
reason: params.failoverReason,
};
}
if (params.fallbackConfigured && params.failoverFailure) {
return {
action: "fallback_model",
reason: params.failoverReason ?? "unknown",
};
}
return {
action: "surface_error",
reason: params.failoverReason,
};
}
const assistantShouldRotate = shouldRotateAssistant(params);
if (!params.profileRotated && assistantShouldRotate) {
return {
action: "rotate_profile",
reason: params.failoverReason,
};
}
if (assistantShouldRotate && params.fallbackConfigured) {
return {
action: "fallback_model",
reason: params.timedOut ? "timeout" : (params.failoverReason ?? "unknown"),
};
}
if (!assistantShouldRotate) {
return {
action: "continue_normal",
};
}
return {
action: "surface_error",
reason: params.failoverReason,
};
}

View File

@@ -0,0 +1,57 @@
import { isLikelyMutatingToolName } from "../../tool-mutation.js";
import type { EmbeddedRunAttemptResult } from "./types.js";
type ReplayMetadataAttempt = Pick<
EmbeddedRunAttemptResult,
"toolMetas" | "didSendViaMessagingTool" | "successfulCronAdds"
>;
type IncompleteTurnAttempt = Pick<
EmbeddedRunAttemptResult,
| "clientToolCall"
| "yieldDetected"
| "didSendDeterministicApprovalPrompt"
| "lastToolError"
| "lastAssistant"
| "replayMetadata"
>;
export function buildAttemptReplayMetadata(
params: ReplayMetadataAttempt,
): EmbeddedRunAttemptResult["replayMetadata"] {
const hadMutatingTools = params.toolMetas.some((t) => isLikelyMutatingToolName(t.toolName));
const hadPotentialSideEffects =
hadMutatingTools || params.didSendViaMessagingTool || (params.successfulCronAdds ?? 0) > 0;
return {
hadPotentialSideEffects,
replaySafe: !hadPotentialSideEffects,
};
}
export function resolveIncompleteTurnPayloadText(params: {
payloadCount: number;
aborted: boolean;
timedOut: boolean;
attempt: IncompleteTurnAttempt;
}): string | null {
if (
params.payloadCount !== 0 ||
params.aborted ||
params.timedOut ||
params.attempt.clientToolCall ||
params.attempt.yieldDetected ||
params.attempt.didSendDeterministicApprovalPrompt ||
params.attempt.lastToolError
) {
return null;
}
const stopReason = params.attempt.lastAssistant?.stopReason;
if (stopReason !== "toolUse" && stopReason !== "error") {
return null;
}
return params.attempt.replayMetadata.hadPotentialSideEffects
? "⚠️ Agent couldn't generate a response. Note: some tool actions may have already been executed — please verify before retrying."
: "⚠️ Agent couldn't generate a response. Please try again.";
}

View File

@@ -0,0 +1,39 @@
import { FailoverError, resolveFailoverStatus } from "../../failover-error.js";
import type { EmbeddedPiAgentMeta, EmbeddedPiRunResult } from "../types.js";
import type { RetryLimitFailoverDecision } from "./failover-policy.js";
export function handleRetryLimitExhaustion(params: {
message: string;
decision: RetryLimitFailoverDecision;
provider: string;
model: string;
profileId?: string;
durationMs: number;
agentMeta: EmbeddedPiAgentMeta;
}): EmbeddedPiRunResult {
if (params.decision.action === "fallback_model") {
throw new FailoverError(params.message, {
reason: params.decision.reason,
provider: params.provider,
model: params.model,
profileId: params.profileId,
status: resolveFailoverStatus(params.decision.reason),
});
}
return {
payloads: [
{
text:
"Request failed after repeated internal retries. " +
"Please try again, or use /new to start a fresh session.",
isError: true,
},
],
meta: {
durationMs: params.durationMs,
agentMeta: params.agentMeta,
error: { kind: "retry_limit", message: params.message },
},
};
}

View File

@@ -61,4 +61,8 @@ export type EmbeddedRunAttemptResult = {
clientToolCall?: { name: string; params: Record<string, unknown> };
/** True when sessions_yield tool was called during this attempt. */
yieldDetected?: boolean;
replayMetadata: {
hadPotentialSideEffects: boolean;
replaySafe: boolean;
};
};

View File

@@ -5,6 +5,7 @@ import {
mockedEnsureRuntimePluginsLoaded,
mockedRunEmbeddedAttempt,
} from "./run.overflow-compaction.harness.js";
import { buildAttemptReplayMetadata } from "./run/incomplete-turn.js";
import type { EmbeddedRunAttemptResult } from "./run/types.js";
let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent;
@@ -12,6 +13,9 @@ let runEmbeddedPiAgent: typeof import("./run.js").runEmbeddedPiAgent;
function makeAttemptResult(
overrides: Partial<EmbeddedRunAttemptResult> = {},
): EmbeddedRunAttemptResult {
const toolMetas = overrides.toolMetas ?? [];
const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false;
const successfulCronAdds = overrides.successfulCronAdds;
return {
aborted: false,
timedOut: false,
@@ -20,9 +24,16 @@ function makeAttemptResult(
sessionIdUsed: "test-session",
messagesSnapshot: [],
assistantTexts: [],
toolMetas: [],
toolMetas,
lastAssistant: undefined,
didSendViaMessagingTool: false,
replayMetadata:
overrides.replayMetadata ??
buildAttemptReplayMetadata({
toolMetas,
didSendViaMessagingTool,
successfulCronAdds,
}),
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],

View File

@@ -3,6 +3,7 @@ import os from "node:os";
import path from "node:path";
import type { AssistantMessage } from "@mariozechner/pi-ai";
import type { OpenClawConfig } from "../../config/config.js";
import { buildAttemptReplayMetadata } from "../pi-embedded-runner/run/incomplete-turn.js";
import type { EmbeddedRunAttemptResult } from "../pi-embedded-runner/run/types.js";
export type EmbeddedPiRunnerTestWorkspace = {
@@ -96,6 +97,9 @@ export function buildEmbeddedRunnerAssistant(
export function makeEmbeddedRunnerAttempt(
overrides: Partial<EmbeddedRunAttemptResult>,
): EmbeddedRunAttemptResult {
const toolMetas = overrides.toolMetas ?? [];
const didSendViaMessagingTool = overrides.didSendViaMessagingTool ?? false;
const successfulCronAdds = overrides.successfulCronAdds;
return {
aborted: false,
timedOut: false,
@@ -105,9 +109,16 @@ export function makeEmbeddedRunnerAttempt(
systemPromptReport: undefined,
messagesSnapshot: [],
assistantTexts: [],
toolMetas: [],
toolMetas,
lastAssistant: undefined,
didSendViaMessagingTool: false,
replayMetadata:
overrides.replayMetadata ??
buildAttemptReplayMetadata({
toolMetas,
didSendViaMessagingTool,
successfulCronAdds,
}),
didSendViaMessagingTool,
messagingToolSentTexts: [],
messagingToolSentMediaUrls: [],
messagingToolSentTargets: [],