streaming tools
设计理念
问题
readFile(file1) → 等待 → readFile(file2) → 等待 → readFile(file3)
总时间: 300ms + 300ms + 300ms = 900ms解决方案
readFile(file1) ┐
readFile(file2) ├→ 并发执行
readFile(file3) ┘
总时间: max(300ms, 300ms, 300ms) = 300msreadFile(file1) → 等待 → readFile(file2) → 等待 → readFile(file3)
总时间: 300ms + 300ms + 300ms = 900msreadFile(file1) ┐
readFile(file2) ├→ 并发执行
readFile(file3) ┘
总时间: max(300ms, 300ms, 300ms) = 300msclass StreamingToolExecutor {
constructor(
private tools: Map<string, Tool>,
private options: {
maxConcurrency: number;
onProgress?: (progress: Progress) => void;
}
) {}
async *execute(
toolCalls: ToolCall[]
): AsyncGenerator<ToolResult> {
const queue = [...toolCalls];
const executing = new Set<Promise<ToolResult>>();
while (queue.length > 0 || executing.size > 0) {
// 启动新的工具执行(不超过并发限制)
while (queue.length > 0 && executing.size < this.options.maxConcurrency) {
const call = queue.shift();
const promise = this.executeTool(call);
executing.add(promise);
// 完成后从集合中移除
promise.finally(() => executing.delete(promise));
}
// 等待任意一个完成
const result = await Promise.race(executing);
yield result;
}
}
private async executeTool(call: ToolCall): Promise<ToolResult> {
const tool = this.tools.get(call.name);
try {
const result = await tool.execute(call.input, this.context);
// 通知进度
this.options.onProgress?.({
tool: call.name,
status: 'completed',
});
return result;
} catch (error) {
this.options.onProgress?.({
tool: call.name,
status: 'failed',
error: error,
});
return {
success: false,
error: error.message,
};
}
}
}const MAX_CONCURRENCY = 5;
// 限制同时执行的工具数量
const semaphore = new Semaphore(MAX_CONCURRENCY);
async function executeTool(call: ToolCall): Promise<ToolResult> {
await semaphore.acquire();
try {
return await tool.execute(call.input, context);
} finally {
semaphore.release();
}
}function groupToolCalls(calls: ToolCall[]): ToolGroup[] {
const groups = [];
let currentGroup = [];
for (const call of calls) {
if (isWriteTool(call.name)) {
// 写操作开始新组
if (currentGroup.length > 0) {
groups.push(currentGroup);
}
groups.push([call]);
currentGroup = [];
} else {
// 读操作可以并发
currentGroup.push(call);
}
}
if (currentGroup.length > 0) {
groups.push(currentGroup);
}
return groups;
}async function executeWithCascade(
calls: ToolCall[]
): Promise<ToolResult[]> {
const results = [];
for await (const result of executor.execute(calls)) {
results.push(result);
// 如果关键工具失败,中断后续执行
if (!result.success && isCritical(result.tool)) {
executor.abort();
break;
}
}
return results;
}function handlePartialFailure(
results: ToolResult[]
): ToolResult {
const successful = results.filter(r => r.success);
const failed = results.filter(r => !r.success);
if (failed.length === 0) {
// 全部成功
return {
success: true,
output: aggregateResults(successful),
};
}
if (successful.length === 0) {
// 全部失败
return {
success: false,
error: 'All tools failed',
};
}
// 部分成功
return {
success: true,
output: aggregateResults(successful),
warnings: failed.map(f => f.error),
};
}const executor = new StreamingToolExecutor(tools, {
maxConcurrency: 5,
onProgress: (progress) => {
// 更新 UI
updateProgressBar(progress);
},
});
for await (const result of executor.execute(toolCalls)) {
console.log(`✓ ${result.tool} completed`);
}function updateProgressBar(progress: Progress) {
const { completed, total } = progress;
const percentage = (completed / total * 100).toFixed(0);
process.stdout.write(`\rProgress: ${percentage}% [${completed}/${total}]`);
}async function speculativeExecute(
messages: Message[]
): Promise<void> {
// 预测可能的工具调用
const predictions = predictNextTools(messages);
// 后台预执行(不阻塞)
predictions.forEach(pred => {
executeTool(pred.tool, pred.input)
.then(result => {
// 缓存结果
speculativeCache.set(pred.key, result);
})
.catch(() => {
// 预测错误,忽略
});
});
}async function executeTool(
call: ToolCall
): Promise<ToolResult> {
const key = `${call.name}:${JSON.stringify(call.input)}`;
// 检查是否有预执行结果
if (speculativeCache.has(key)) {
return speculativeCache.get(key);
}
// 正常执行
return await tool.execute(call.input, context);
}async function measureToolExecution(
call: ToolCall
): Promise<{ result: ToolResult; duration: number }> {
const start = performance.now();
const result = await executeTool(call);
const duration = performance.now() - start;
// 记录指标
recordMetric(call.name, duration);
return { result, duration };
}