refactor: dedupe lobster runtime helpers

This commit is contained in:
Peter Steinberger
2026-04-06 20:32:48 +01:00
parent 95df6d9332
commit f2bbf2b8e7

View File

@@ -154,6 +154,17 @@ type ApprovalRequestItem = {
resumeToken?: string;
};
type PipelineRuntimeContext = {
registry: unknown;
stdin: NodeJS.ReadableStream;
stdout: NodeJS.WritableStream;
stderr: NodeJS.WritableStream;
env: Record<string, string | undefined>;
cwd: string;
llmAdapters?: Record<string, unknown>;
signal?: AbortSignal;
};
function normalizeForCwdSandbox(p: string): string {
const normalized = path.normalize(p);
return process.platform === "win32" ? normalized.toLowerCase() : normalized;
@@ -242,6 +253,50 @@ function asApprovalRequestItem(item: unknown): ApprovalRequestItem | null {
return candidate as ApprovalRequestItem;
}
function normalizeWorkflowOutput(
okEnvelope: (
status: "ok" | "needs_approval" | "cancelled",
output: unknown[],
requiresApproval: EmbeddedToolEnvelope["requiresApproval"],
) => EmbeddedToolEnvelope,
output: {
status: "ok" | "needs_approval" | "cancelled";
output: unknown[];
requiresApproval?: EmbeddedToolEnvelope["requiresApproval"];
},
): EmbeddedToolEnvelope {
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
}
async function runPipelineWithRuntime(
deps: ToolRuntimeDeps,
params: {
pipeline: Array<{ name: string; args: Record<string, unknown>; raw: string }>;
input: AsyncIterable<unknown> | unknown[];
runtime: PipelineRuntimeContext;
},
) {
return await deps.runPipeline({
pipeline: params.pipeline,
registry: params.runtime.registry,
input: params.input,
stdin: params.runtime.stdin,
stdout: params.runtime.stdout,
stderr: params.runtime.stderr,
env: params.runtime.env,
mode: "tool",
cwd: params.runtime.cwd,
llmAdapters: params.runtime.llmAdapters,
signal: params.runtime.signal,
});
}
async function resolveWorkflowFile(candidate: string, cwd: string) {
const { stat } = await import("node:fs/promises");
const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate);
@@ -403,14 +458,7 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR
args,
ctx: runtime,
});
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
return normalizeWorkflowOutput(okEnvelope, output);
} catch (error) {
return errorEnvelope(
"runtime_error",
@@ -427,18 +475,10 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR
}
try {
const output = await deps.runPipeline({
const output = await runPipelineWithRuntime(deps, {
pipeline: parsed,
registry: runtime.registry,
input: [],
stdin: runtime.stdin,
stdout: runtime.stdout,
stderr: runtime.stderr,
env: runtime.env,
mode: "tool",
cwd: runtime.cwd,
llmAdapters: runtime.llmAdapters,
signal: runtime.signal,
runtime,
});
const approval =
@@ -507,14 +547,7 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR
resume: payload as Record<string, unknown>,
approved: true,
});
if (output.status === "needs_approval") {
return okEnvelope("needs_approval", [], output.requiresApproval ?? null);
}
if (output.status === "cancelled") {
return okEnvelope("cancelled", [], null);
}
return okEnvelope("ok", output.output, null);
return normalizeWorkflowOutput(okEnvelope, output);
} catch (error) {
return errorEnvelope(
"runtime_error",
@@ -527,18 +560,10 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR
const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? "");
const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex);
const output = await deps.runPipeline({
const output = await runPipelineWithRuntime(deps, {
pipeline: remaining,
registry: runtime.registry,
input: streamFromItems(resumeState.items),
stdin: runtime.stdin,
stdout: runtime.stdout,
stderr: runtime.stderr,
env: runtime.env,
mode: "tool",
cwd: runtime.cwd,
llmAdapters: runtime.llmAdapters,
signal: runtime.signal,
runtime,
});
const approval =