query loop
设计理念
核心实现
函数签名
async function* query(
messages: Message[],
tools: Tool[],
options: QueryOptions
): AsyncGenerator<QueryEvent, void, unknown>async function* query(
messages: Message[],
tools: Tool[],
options: QueryOptions
): AsyncGenerator<QueryEvent, void, unknown>type QueryEvent =
| { type: 'content'; content: string } // AI 文本输出
| { type: 'tool_use'; tool: string; input: any } // 工具调用
| { type: 'tool_result'; result: ToolResult } // 工具结果
| { type: 'error'; error: Error } // 错误
| { type: 'done' } // 完成async function* query(messages, tools, options) {
// 1. 构建 System Prompt
const systemPrompt = await buildSystemPrompt(tools, options);
// 2. 准备消息
const apiMessages = prepareMessages(messages);
// 3. 检查 token 预算
const tokenCount = estimateTokens(systemPrompt, apiMessages);
if (tokenCount > 180000) {
await autoCompact(apiMessages);
}
// 4. 开始流式请求
const stream = anthropic.messages.stream({
model: options.model || 'claude-3-5-sonnet-20241022',
max_tokens: 8192,
system: systemPrompt,
messages: apiMessages,
tools: tools.map(t => t.toAnthropicTool()),
});
// ...
}// 逐 token 接收 AI 响应
for await (const event of stream) {
switch (event.type) {
case 'content_block_delta':
// 文本内容
if (event.delta.type === 'text_delta') {
yield { type: 'content', content: event.delta.text };
}
break;
case 'content_block_start':
// 工具调用开始
if (event.content_block.type === 'tool_use') {
currentToolUse = {
id: event.content_block.id,
name: event.content_block.name,
input: '',
};
}
break;
case 'content_block_stop':
// 工具调用完成
if (currentToolUse) {
toolCalls.push(currentToolUse);
currentToolUse = null;
}
break;
}
}// 并发执行工具调用
if (toolCalls.length > 0) {
yield { type: 'tool_execution_start', count: toolCalls.length };
const executor = new StreamingToolExecutor(tools, {
maxConcurrency: 5,
onProgress: (progress) => {
// 实时进度反馈
},
});
for await (const result of executor.execute(toolCalls)) {
yield { type: 'tool_result', result };
}
}// 如果 AI 需要继续对话
if (response.stop_reason === 'tool_use') {
// 将工具结果添加到消息历史
messages.push({
role: 'assistant',
content: response.content,
});
messages.push({
role: 'user',
content: toolResults.map(r => ({
type: 'tool_result',
tool_use_id: r.id,
content: r.output,
})),
});
// 递归调用 query (尾递归优化)
yield* query(messages, tools, options);
}Analyzing the codebase...
├─ Reading src/main.tsx
├─ Searching for imports
└─ Found 15 dependencies
The main entry point uses React and Ink to...// 同时读取多个文件
const results = await Promise.all([
readFile('src/main.tsx'),
readFile('src/query.ts'),
readFile('src/Tool.ts'),
]);process.on('SIGINT', () => {
// 清理资源
executor.abort();
// 保存状态
saveState(currentState);
// 退出
process.exit(0);
});// 动态调整压缩策略
const budget = 200000; // Claude API 限制
const current = estimateTokens(messages);
if (current > budget * 0.9) {
// 激进压缩
await autoCompact(messages, { aggressive: true });
} else if (current > budget * 0.75) {
// 标准压缩
await contextCollapse(messages);
} else if (current > budget * 0.6) {
// 轻度压缩
await microcompact(messages);
}// System Prompt 缓存
const cacheKey = `system_prompt_${toolsHash}_${configHash}`;
const cached = cache.get(cacheKey);
if (cached) {
return cached;
}
const prompt = await buildSystemPrompt(tools, config);
cache.set(cacheKey, prompt, { ttl: 3600 });
return prompt;// 投机性执行可能的工具调用
if (options.enableSpeculation) {
const predictions = predictNextTools(messages);
// 后台预执行
Promise.all(predictions.map(p =>
executeTool(p.tool, p.input).catch(() => null)
));
}try {
// Query Loop 主逻辑
for await (const event of stream) {
try {
// 工具执行
const result = await executeTool(toolCall);
yield { type: 'tool_result', result };
} catch (toolError) {
// 工具级错误 - 返回给 AI
yield {
type: 'tool_result',
result: { error: formatError(toolError) }
};
}
}
} catch (apiError) {
// API 级错误 - 重试或通知用户
if (isRetryable(apiError)) {
await sleep(1000);
yield* query(messages, tools, options);
} else {
yield { type: 'error', error: apiError };
}
}// 用户: "读取 package.json"
const messages = [
{ role: 'user', content: '读取 package.json' }
];
for await (const event of query(messages, tools, options)) {
if (event.type === 'tool_use') {
console.log(`Calling ${event.tool}...`);
} else if (event.type === 'content') {
process.stdout.write(event.content);
}
}
// 输出:
// Calling readFile...
// The package.json file contains...// 用户: "分析 main.tsx 并找出所有导入"
//
// 第一轮:
// AI → 调用 readFile('src/main.tsx')
//
// 第二轮:
// AI → 分析内容,调用 grepSearch('import.*from')
//
// 第三轮:
// AI → 总结发现的 15 个导入// Query Loop 调用工具
const tool = tools.find(t => t.name === toolCall.name);
const result = await tool.execute(toolCall.input, context);// 更新状态
store.setState(state => ({
...state,
currentQuery: {
status: 'running',
tokenCount: estimateTokens(messages),
},
}));// UI 订阅 Query 事件
for await (const event of query(...)) {
dispatch({ type: 'QUERY_EVENT', event });
}