summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packages/cli/src/utils/readStdin.ts24
-rw-r--r--packages/cli/src/utils/sandbox.ts54
-rw-r--r--packages/core/src/tools/grep.ts33
-rw-r--r--packages/core/src/tools/shell.ts8
-rw-r--r--packages/core/src/tools/tool-registry.ts53
5 files changed, 125 insertions, 47 deletions
diff --git a/packages/cli/src/utils/readStdin.ts b/packages/cli/src/utils/readStdin.ts
index d890aa2c..2e005526 100644
--- a/packages/cli/src/utils/readStdin.ts
+++ b/packages/cli/src/utils/readStdin.ts
@@ -9,19 +9,31 @@ export async function readStdin(): Promise<string> {
let data = '';
process.stdin.setEncoding('utf8');
- process.stdin.on('readable', () => {
+ const onReadable = () => {
let chunk;
while ((chunk = process.stdin.read()) !== null) {
data += chunk;
}
- });
+ };
- process.stdin.on('end', () => {
+ const onEnd = () => {
+ cleanup();
resolve(data);
- });
+ };
- process.stdin.on('error', (err) => {
+ const onError = (err: Error) => {
+ cleanup();
reject(err);
- });
+ };
+
+ const cleanup = () => {
+ process.stdin.removeListener('readable', onReadable);
+ process.stdin.removeListener('end', onEnd);
+ process.stdin.removeListener('error', onError);
+ };
+
+ process.stdin.on('readable', onReadable);
+ process.stdin.on('end', onEnd);
+ process.stdin.on('error', onError);
});
}
diff --git a/packages/cli/src/utils/sandbox.ts b/packages/cli/src/utils/sandbox.ts
index f33cb480..e583f0ff 100644
--- a/packages/cli/src/utils/sandbox.ts
+++ b/packages/cli/src/utils/sandbox.ts
@@ -536,28 +536,28 @@ async function pullImage(sandbox: string, image: string): Promise<boolean> {
const pullProcess = spawn(sandbox, args, { stdio: 'pipe' });
let stderrData = '';
- if (pullProcess.stdout) {
- pullProcess.stdout.on('data', (data) => {
- console.info(data.toString().trim()); // Show pull progress
- });
- }
- if (pullProcess.stderr) {
- pullProcess.stderr.on('data', (data) => {
- stderrData += data.toString();
- console.error(data.toString().trim()); // Show pull errors/info from the command itself
- });
- }
- pullProcess.on('error', (err) => {
+ const onStdoutData = (data: Buffer) => {
+ console.info(data.toString().trim()); // Show pull progress
+ };
+
+ const onStderrData = (data: Buffer) => {
+ stderrData += data.toString();
+ console.error(data.toString().trim()); // Show pull errors/info from the command itself
+ };
+
+ const onError = (err: Error) => {
console.warn(
`Failed to start '${sandbox} pull ${image}' command: ${err.message}`,
);
+ cleanup();
resolve(false);
- });
+ };
- pullProcess.on('close', (code) => {
+ const onClose = (code: number | null) => {
if (code === 0) {
console.info(`Successfully pulled image ${image}.`);
+ cleanup();
resolve(true);
} else {
console.warn(
@@ -566,9 +566,33 @@ async function pullImage(sandbox: string, image: string): Promise<boolean> {
if (stderrData.trim()) {
// Details already printed by the stderr listener above
}
+ cleanup();
resolve(false);
}
- });
+ };
+
+ const cleanup = () => {
+ if (pullProcess.stdout) {
+ pullProcess.stdout.removeListener('data', onStdoutData);
+ }
+ if (pullProcess.stderr) {
+ pullProcess.stderr.removeListener('data', onStderrData);
+ }
+ pullProcess.removeListener('error', onError);
+ pullProcess.removeListener('close', onClose);
+ if (pullProcess.connected) {
+ pullProcess.disconnect();
+ }
+ };
+
+ if (pullProcess.stdout) {
+ pullProcess.stdout.on('data', onStdoutData);
+ }
+ if (pullProcess.stderr) {
+ pullProcess.stderr.on('data', onStderrData);
+ }
+ pullProcess.on('error', onError);
+ pullProcess.on('close', onClose);
});
}
diff --git a/packages/core/src/tools/grep.ts b/packages/core/src/tools/grep.ts
index acdf0bc8..38a1d4f5 100644
--- a/packages/core/src/tools/grep.ts
+++ b/packages/core/src/tools/grep.ts
@@ -461,8 +461,8 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> {
const stdoutChunks: Buffer[] = [];
const stderrChunks: Buffer[] = [];
- child.stdout.on('data', (chunk) => stdoutChunks.push(chunk));
- child.stderr.on('data', (chunk) => {
+ const onData = (chunk: Buffer) => stdoutChunks.push(chunk);
+ const onStderr = (chunk: Buffer) => {
const stderrStr = chunk.toString();
// Suppress common harmless stderr messages
if (
@@ -471,15 +471,17 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> {
) {
stderrChunks.push(chunk);
}
- });
- child.on('error', (err) =>
- reject(new Error(`Failed to start system grep: ${err.message}`)),
- );
- child.on('close', (code) => {
+ };
+ const onError = (err: Error) => {
+ cleanup();
+ reject(new Error(`Failed to start system grep: ${err.message}`));
+ };
+ const onClose = (code: number | null) => {
const stdoutData = Buffer.concat(stdoutChunks).toString('utf8');
const stderrData = Buffer.concat(stderrChunks)
.toString('utf8')
.trim();
+ cleanup();
if (code === 0) resolve(stdoutData);
else if (code === 1)
resolve(''); // No matches
@@ -492,7 +494,22 @@ export class GrepTool extends BaseTool<GrepToolParams, ToolResult> {
);
else resolve(''); // Exit code > 1 but no stderr, likely just suppressed errors
}
- });
+ };
+
+ const cleanup = () => {
+ child.stdout.removeListener('data', onData);
+ child.stderr.removeListener('data', onStderr);
+ child.removeListener('error', onError);
+ child.removeListener('close', onClose);
+ if (child.connected) {
+ child.disconnect();
+ }
+ };
+
+ child.stdout.on('data', onData);
+ child.stderr.on('data', onStderr);
+ child.on('error', onError);
+ child.on('close', onClose);
});
return this.parseGrepOutput(output, absolutePath);
} catch (grepError: unknown) {
diff --git a/packages/core/src/tools/shell.ts b/packages/core/src/tools/shell.ts
index 2117366a..8fa32490 100644
--- a/packages/core/src/tools/shell.ts
+++ b/packages/core/src/tools/shell.ts
@@ -257,9 +257,11 @@ export class ShellTool extends BaseTool<ShellToolParams, ToolResult> {
abortSignal.addEventListener('abort', abortHandler);
// wait for the shell to exit
- await new Promise((resolve) => shell.on('exit', resolve));
-
- abortSignal.removeEventListener('abort', abortHandler);
+ try {
+ await new Promise((resolve) => shell.on('exit', resolve));
+ } finally {
+ abortSignal.removeEventListener('abort', abortHandler);
+ }
// parse pids (pgrep output) from temporary file and remove it
const backgroundPIDs: number[] = [];
diff --git a/packages/core/src/tools/tool-registry.ts b/packages/core/src/tools/tool-registry.ts
index 2b27a703..f3162ac0 100644
--- a/packages/core/src/tools/tool-registry.ts
+++ b/packages/core/src/tools/tool-registry.ts
@@ -53,28 +53,51 @@ Signal: Signal number or \`(none)\` if no signal was received.
const child = spawn(callCommand, [this.name]);
child.stdin.write(JSON.stringify(params));
child.stdin.end();
+
let stdout = '';
let stderr = '';
- child.stdout.on('data', (data) => {
- stdout += data?.toString();
- });
- child.stderr.on('data', (data) => {
- stderr += data?.toString();
- });
let error: Error | null = null;
- child.on('error', (err: Error) => {
- error = err;
- });
let code: number | null = null;
let signal: NodeJS.Signals | null = null;
- child.on(
- 'close',
- (_code: number | null, _signal: NodeJS.Signals | null) => {
+
+ await new Promise<void>((resolve) => {
+ const onStdout = (data: Buffer) => {
+ stdout += data?.toString();
+ };
+
+ const onStderr = (data: Buffer) => {
+ stderr += data?.toString();
+ };
+
+ const onError = (err: Error) => {
+ error = err;
+ };
+
+ const onClose = (
+ _code: number | null,
+ _signal: NodeJS.Signals | null,
+ ) => {
code = _code;
signal = _signal;
- },
- );
- await new Promise((resolve) => child.on('close', resolve));
+ cleanup();
+ resolve();
+ };
+
+ const cleanup = () => {
+ child.stdout.removeListener('data', onStdout);
+ child.stderr.removeListener('data', onStderr);
+ child.removeListener('error', onError);
+ child.removeListener('close', onClose);
+ if (child.connected) {
+ child.disconnect();
+ }
+ };
+
+ child.stdout.on('data', onStdout);
+ child.stderr.on('data', onStderr);
+ child.on('error', onError);
+ child.on('close', onClose);
+ });
// if there is any error, non-zero exit code, signal, or stderr, return error details instead of stdout
if (error || code !== 0 || signal || stderr) {