diff options
| -rw-r--r-- | packages/cli/src/utils/readStdin.ts | 24 | ||||
| -rw-r--r-- | packages/cli/src/utils/sandbox.ts | 54 | ||||
| -rw-r--r-- | packages/core/src/tools/grep.ts | 33 | ||||
| -rw-r--r-- | packages/core/src/tools/shell.ts | 8 | ||||
| -rw-r--r-- | packages/core/src/tools/tool-registry.ts | 53 |
5 files changed, 125 insertions, 47 deletions
diff --git a/packages/cli/src/utils/readStdin.ts b/packages/cli/src/utils/readStdin.ts index d890aa2c..2e005526 100644 --- a/packages/cli/src/utils/readStdin.ts +++ b/packages/cli/src/utils/readStdin.ts @@ -9,19 +9,31 @@ export async function readStdin(): Promise<string> { let data = ''; process.stdin.setEncoding('utf8'); - process.stdin.on('readable', () => { + const onReadable = () => { let chunk; while ((chunk = process.stdin.read()) !== null) { data += chunk; } - }); + }; - process.stdin.on('end', () => { + const onEnd = () => { + cleanup(); resolve(data); - }); + }; - process.stdin.on('error', (err) => { + const onError = (err: Error) => { + cleanup(); reject(err); - }); + }; + + const cleanup = () => { + process.stdin.removeListener('readable', onReadable); + process.stdin.removeListener('end', onEnd); + process.stdin.removeListener('error', onError); + }; + + process.stdin.on('readable', onReadable); + process.stdin.on('end', onEnd); + process.stdin.on('error', onError); }); } diff --git a/packages/cli/src/utils/sandbox.ts b/packages/cli/src/utils/sandbox.ts index f33cb480..e583f0ff 100644 --- a/packages/cli/src/utils/sandbox.ts +++ b/packages/cli/src/utils/sandbox.ts @@ -536,28 +536,28 @@ async function pullImage(sandbox: string, image: string): Promise<boolean> { const pullProcess = spawn(sandbox, args, { stdio: 'pipe' }); let stderrData = ''; - if (pullProcess.stdout) { - pullProcess.stdout.on('data', (data) => { - console.info(data.toString().trim()); // Show pull progress - }); - } - if (pullProcess.stderr) { - pullProcess.stderr.on('data', (data) => { - stderrData += data.toString(); - console.error(data.toString().trim()); // Show pull errors/info from the command itself - }); - } - pullProcess.on('error', (err) => { + const onStdoutData = (data: Buffer) => { + console.info(data.toString().trim()); // Show pull progress + }; + + const onStderrData = (data: Buffer) => { + stderrData += data.toString(); + console.error(data.toString().trim()); // Show pull errors/info from the command itself + }; + + const onError = (err: Error) => { console.warn( `Failed to start '${sandbox} pull ${image}' command: ${err.message}`, ); + cleanup(); resolve(false); - }); + }; - pullProcess.on('close', (code) => { + const onClose = (code: number | null) => { if (code === 0) { console.info(`Successfully pulled image ${image}.`); + cleanup(); resolve(true); } else { console.warn( @@ -566,9 +566,33 @@ async function pullImage(sandbox: string, image: string): Promise<boolean> { if (stderrData.trim()) { // Details already printed by the stderr listener above } + cleanup(); resolve(false); } - }); + }; + + const cleanup = () => { + if (pullProcess.stdout) { + pullProcess.stdout.removeListener('data', onStdoutData); + } + if (pullProcess.stderr) { + pullProcess.stderr.removeListener('data', onStderrData); + } + pullProcess.removeListener('error', onError); + pullProcess.removeListener('close', onClose); + if (pullProcess.connected) { + pullProcess.disconnect(); + } + }; + + if (pullProcess.stdout) { + pullProcess.stdout.on('data', onStdoutData); + } + if (pullProcess.stderr) { + pullProcess.stderr.on('data', onStderrData); + } + pullProcess.on('error', onError); + pullProcess.on('close', onClose); }); } diff --git a/packages/core/src/tools/grep.ts b/packages/core/src/tools/grep.ts index acdf0bc8..38a1d4f5 100644 --- a/packages/core/src/tools/grep.ts +++ b/packages/core/src/tools/grep.ts @@ -461,8 +461,8 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> { const stdoutChunks: Buffer[] = []; const stderrChunks: Buffer[] = []; - child.stdout.on('data', (chunk) => stdoutChunks.push(chunk)); - child.stderr.on('data', (chunk) => { + const onData = (chunk: Buffer) => stdoutChunks.push(chunk); + const onStderr = (chunk: Buffer) => { const stderrStr = chunk.toString(); // Suppress common harmless stderr messages if ( @@ -471,15 +471,17 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> { ) { stderrChunks.push(chunk); } - }); - child.on('error', (err) => - reject(new Error(`Failed to start system grep: ${err.message}`)), - ); - child.on('close', (code) => { + }; + const onError = (err: Error) => { + cleanup(); + reject(new Error(`Failed to start system grep: ${err.message}`)); + }; + const onClose = (code: number | null) => { const stdoutData = Buffer.concat(stdoutChunks).toString('utf8'); const stderrData = Buffer.concat(stderrChunks) .toString('utf8') .trim(); + cleanup(); if (code === 0) resolve(stdoutData); else if (code === 1) resolve(''); // No matches @@ -492,7 +494,22 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> { ); else resolve(''); // Exit code > 1 but no stderr, likely just suppressed errors } - }); + }; + + const cleanup = () => { + child.stdout.removeListener('data', onData); + child.stderr.removeListener('data', onStderr); + child.removeListener('error', onError); + child.removeListener('close', onClose); + if (child.connected) { + child.disconnect(); + } + }; + + child.stdout.on('data', onData); + child.stderr.on('data', onStderr); + child.on('error', onError); + child.on('close', onClose); }); return this.parseGrepOutput(output, absolutePath); } catch (grepError: unknown) { diff --git a/packages/core/src/tools/shell.ts b/packages/core/src/tools/shell.ts index 2117366a..8fa32490 100644 --- a/packages/core/src/tools/shell.ts +++ b/packages/core/src/tools/shell.ts @@ -257,9 +257,11 @@ export class ShellTool extends BaseTool<ShellToolParams, ToolResult> { abortSignal.addEventListener('abort', abortHandler); // wait for the shell to exit - await new Promise((resolve) => shell.on('exit', resolve)); - - abortSignal.removeEventListener('abort', abortHandler); + try { + await new Promise((resolve) => shell.on('exit', resolve)); + } finally { + abortSignal.removeEventListener('abort', abortHandler); + } // parse pids (pgrep output) from temporary file and remove it const backgroundPIDs: number[] = []; diff --git a/packages/core/src/tools/tool-registry.ts b/packages/core/src/tools/tool-registry.ts index 2b27a703..f3162ac0 100644 --- a/packages/core/src/tools/tool-registry.ts +++ b/packages/core/src/tools/tool-registry.ts @@ -53,28 +53,51 @@ Signal: Signal number or \`(none)\` if no signal was received. const child = spawn(callCommand, [this.name]); child.stdin.write(JSON.stringify(params)); child.stdin.end(); + let stdout = ''; let stderr = ''; - child.stdout.on('data', (data) => { - stdout += data?.toString(); - }); - child.stderr.on('data', (data) => { - stderr += data?.toString(); - }); let error: Error | null = null; - child.on('error', (err: Error) => { - error = err; - }); let code: number | null = null; let signal: NodeJS.Signals | null = null; - child.on( - 'close', - (_code: number | null, _signal: NodeJS.Signals | null) => { + + await new Promise<void>((resolve) => { + const onStdout = (data: Buffer) => { + stdout += data?.toString(); + }; + + const onStderr = (data: Buffer) => { + stderr += data?.toString(); + }; + + const onError = (err: Error) => { + error = err; + }; + + const onClose = ( + _code: number | null, + _signal: NodeJS.Signals | null, + ) => { code = _code; signal = _signal; - }, - ); - await new Promise((resolve) => child.on('close', resolve)); + cleanup(); + resolve(); + }; + + const cleanup = () => { + child.stdout.removeListener('data', onStdout); + child.stderr.removeListener('data', onStderr); + child.removeListener('error', onError); + child.removeListener('close', onClose); + if (child.connected) { + child.disconnect(); + } + }; + + child.stdout.on('data', onStdout); + child.stderr.on('data', onStderr); + child.on('error', onError); + child.on('close', onClose); + }); // if there is any error, non-zero exit code, signal, or stderr, return error details instead of stdout if (error || code !== 0 || signal || stderr) { |
