diff --git a/extensions/lobster/src/lobster-runner.ts b/extensions/lobster/src/lobster-runner.ts index 6da1913f023..435287fcf05 100644 --- a/extensions/lobster/src/lobster-runner.ts +++ b/extensions/lobster/src/lobster-runner.ts @@ -154,6 +154,17 @@ type ApprovalRequestItem = { resumeToken?: string; }; +type PipelineRuntimeContext = { + registry: unknown; + stdin: NodeJS.ReadableStream; + stdout: NodeJS.WritableStream; + stderr: NodeJS.WritableStream; + env: Record; + cwd: string; + llmAdapters?: Record; + signal?: AbortSignal; +}; + function normalizeForCwdSandbox(p: string): string { const normalized = path.normalize(p); return process.platform === "win32" ? normalized.toLowerCase() : normalized; @@ -242,6 +253,50 @@ function asApprovalRequestItem(item: unknown): ApprovalRequestItem | null { return candidate as ApprovalRequestItem; } +function normalizeWorkflowOutput( + okEnvelope: ( + status: "ok" | "needs_approval" | "cancelled", + output: unknown[], + requiresApproval: EmbeddedToolEnvelope["requiresApproval"], + ) => EmbeddedToolEnvelope, + output: { + status: "ok" | "needs_approval" | "cancelled"; + output: unknown[]; + requiresApproval?: EmbeddedToolEnvelope["requiresApproval"]; + }, +): EmbeddedToolEnvelope { + if (output.status === "needs_approval") { + return okEnvelope("needs_approval", [], output.requiresApproval ?? null); + } + if (output.status === "cancelled") { + return okEnvelope("cancelled", [], null); + } + return okEnvelope("ok", output.output, null); +} + +async function runPipelineWithRuntime( + deps: ToolRuntimeDeps, + params: { + pipeline: Array<{ name: string; args: Record; raw: string }>; + input: AsyncIterable | unknown[]; + runtime: PipelineRuntimeContext; + }, +) { + return await deps.runPipeline({ + pipeline: params.pipeline, + registry: params.runtime.registry, + input: params.input, + stdin: params.runtime.stdin, + stdout: params.runtime.stdout, + stderr: params.runtime.stderr, + env: params.runtime.env, + mode: "tool", + cwd: params.runtime.cwd, + llmAdapters: params.runtime.llmAdapters, + signal: params.runtime.signal, + }); +} + async function resolveWorkflowFile(candidate: string, cwd: string) { const { stat } = await import("node:fs/promises"); const resolved = path.isAbsolute(candidate) ? candidate : path.resolve(cwd, candidate); @@ -403,14 +458,7 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR args, ctx: runtime, }); - - if (output.status === "needs_approval") { - return okEnvelope("needs_approval", [], output.requiresApproval ?? null); - } - if (output.status === "cancelled") { - return okEnvelope("cancelled", [], null); - } - return okEnvelope("ok", output.output, null); + return normalizeWorkflowOutput(okEnvelope, output); } catch (error) { return errorEnvelope( "runtime_error", @@ -427,18 +475,10 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR } try { - const output = await deps.runPipeline({ + const output = await runPipelineWithRuntime(deps, { pipeline: parsed, - registry: runtime.registry, input: [], - stdin: runtime.stdin, - stdout: runtime.stdout, - stderr: runtime.stderr, - env: runtime.env, - mode: "tool", - cwd: runtime.cwd, - llmAdapters: runtime.llmAdapters, - signal: runtime.signal, + runtime, }); const approval = @@ -507,14 +547,7 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR resume: payload as Record, approved: true, }); - - if (output.status === "needs_approval") { - return okEnvelope("needs_approval", [], output.requiresApproval ?? null); - } - if (output.status === "cancelled") { - return okEnvelope("cancelled", [], null); - } - return okEnvelope("ok", output.output, null); + return normalizeWorkflowOutput(okEnvelope, output); } catch (error) { return errorEnvelope( "runtime_error", @@ -527,18 +560,10 @@ function createFallbackEmbeddedToolRuntime(deps: ToolRuntimeDeps): EmbeddedToolR const resumeState = await loadPipelineResumeState(runtime.env, payload.stateKey ?? ""); const remaining = resumeState.pipeline.slice(resumeState.resumeAtIndex); - const output = await deps.runPipeline({ + const output = await runPipelineWithRuntime(deps, { pipeline: remaining, - registry: runtime.registry, input: streamFromItems(resumeState.items), - stdin: runtime.stdin, - stdout: runtime.stdout, - stderr: runtime.stderr, - env: runtime.env, - mode: "tool", - cwd: runtime.cwd, - llmAdapters: runtime.llmAdapters, - signal: runtime.signal, + runtime, }); const approval =