Skip to content

Commit ff1bd76

Browse files
committed
refactor: split review execution dispatcher helpers
Made-with: Cursor
1 parent eeef73a commit ff1bd76

File tree

5 files changed

+164
-121
lines changed

5 files changed

+164
-121
lines changed

TODO.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@
107107
- [x] `src/review/pipeline/comments.rs`: separate comment assembly, filtering, and metadata stamping.
108108
- [x] `src/review/pipeline/postprocess/dedup.rs`: split duplicate detection, scoring, and merge/rewrite behavior.
109109
- [x] `src/review/pipeline/postprocess/feedback.rs`: separate store lookups from suppression/annotation decisions.
110-
- [ ] `src/review/pipeline/execution/dispatcher.rs`: carve request scheduling, concurrency control, and result collection.
110+
- [x] `src/review/pipeline/execution/dispatcher.rs`: carve request scheduling, concurrency control, and result collection.
111111
- [ ] `src/review/pipeline.rs`: keep trimming top-level orchestration as helpers mature.
112112

113113
### Review helper backlog
Lines changed: 8 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
#[path = "dispatcher/context.rs"]
2+
mod context;
3+
#[path = "dispatcher/job.rs"]
4+
mod job;
5+
#[path = "dispatcher/run.rs"]
6+
mod run;
7+
18
use anyhow::Result;
2-
use futures::StreamExt;
39
use std::path::PathBuf;
4-
use std::sync::Arc;
5-
use std::time::Instant;
610

711
use crate::adapters;
812
use crate::core;
913

10-
use super::super::contracts::{FileReviewJob, ReviewExecutionContext};
1114
use super::super::types::AgentActivity;
1215

1316
pub(super) struct DispatchedJobResult {
@@ -24,119 +27,4 @@ pub(super) struct DispatchedJobResult {
2427
pub agent_data: Option<AgentActivity>,
2528
}
2629

27-
pub(super) async fn dispatch_jobs(
28-
jobs: Vec<FileReviewJob>,
29-
context: &ReviewExecutionContext<'_>,
30-
) -> Vec<DispatchedJobResult> {
31-
const MAX_CONCURRENT_FILES: usize = 5;
32-
let concurrency = if context.services.is_local {
33-
1
34-
} else {
35-
MAX_CONCURRENT_FILES
36-
};
37-
38-
tracing::info!(
39-
"Sending {} LLM requests (concurrency={})",
40-
jobs.len(),
41-
concurrency,
42-
);
43-
44-
let agent_tool_ctx = build_agent_tool_context(context);
45-
let agent_loop_config = core::agent_loop::AgentLoopConfig {
46-
max_iterations: context.services.config.agent_max_iterations,
47-
max_total_tokens: context.services.config.agent_max_total_tokens,
48-
};
49-
let agent_tools_filter = context.services.config.agent_tools_enabled.clone();
50-
51-
futures::stream::iter(jobs)
52-
.map(|job| {
53-
let adapter = context.services.adapter.clone();
54-
let agent_ctx = agent_tool_ctx.clone();
55-
let loop_config = agent_loop_config.clone();
56-
let tools_filter = agent_tools_filter.clone();
57-
async move {
58-
if context.services.is_local {
59-
eprintln!("Sending {} to local model...", job.file_path.display());
60-
}
61-
let request_start = Instant::now();
62-
63-
let (response, agent_data) = if let Some(ctx) = agent_ctx {
64-
let tools = core::agent_tools::build_review_tools(ctx, tools_filter.as_deref());
65-
let tool_defs: Vec<_> = tools.iter().map(|tool| tool.definition()).collect();
66-
let chat_request =
67-
adapters::llm::ChatRequest::from_llm_request(job.request, &tool_defs);
68-
match core::agent_loop::run_agent_loop(
69-
adapter.as_ref(),
70-
chat_request,
71-
&tools,
72-
&loop_config,
73-
None,
74-
)
75-
.await
76-
{
77-
Ok(result) => {
78-
let activity = AgentActivity {
79-
total_iterations: result.iterations,
80-
tool_calls: result.tool_calls,
81-
};
82-
(
83-
Ok(adapters::llm::LLMResponse {
84-
content: result.content,
85-
model: result.model,
86-
usage: Some(result.total_usage),
87-
}),
88-
Some(activity),
89-
)
90-
}
91-
Err(error) => (Err(error), None),
92-
}
93-
} else {
94-
(adapter.complete(job.request).await, None)
95-
};
96-
97-
let latency_ms = request_start.elapsed().as_millis() as u64;
98-
if context.services.is_local {
99-
eprintln!(
100-
"{}: response received ({:.1}s)",
101-
job.file_path.display(),
102-
latency_ms as f64 / 1000.0
103-
);
104-
}
105-
DispatchedJobResult {
106-
job_order: job.job_order,
107-
diff_index: job.diff_index,
108-
active_rules: job.active_rules,
109-
path_config: job.path_config,
110-
file_path: job.file_path,
111-
deterministic_comments: job.deterministic_comments,
112-
pass_kind: job.pass_kind,
113-
mark_file_complete: job.mark_file_complete,
114-
response,
115-
latency_ms,
116-
agent_data,
117-
}
118-
}
119-
})
120-
.buffer_unordered(concurrency)
121-
.collect()
122-
.await
123-
}
124-
125-
fn build_agent_tool_context(
126-
context: &ReviewExecutionContext<'_>,
127-
) -> Option<Arc<core::agent_tools::ReviewToolContext>> {
128-
if !(context.services.config.agent_review && context.services.adapter.supports_tools()) {
129-
return None;
130-
}
131-
132-
let context_fetcher_arc = Arc::new(core::ContextFetcher::new(
133-
context.services.repo_path.clone(),
134-
));
135-
Some(Arc::new(core::agent_tools::ReviewToolContext {
136-
repo_path: context.services.repo_path.clone(),
137-
context_fetcher: context_fetcher_arc,
138-
symbol_index: None,
139-
symbol_graph: None,
140-
git_history: None,
141-
}))
142-
}
30+
pub(super) use run::dispatch_jobs;
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use std::sync::Arc;
2+
3+
use crate::core;
4+
5+
use super::super::super::contracts::ReviewExecutionContext;
6+
7+
const MAX_CONCURRENT_FILES: usize = 5;
8+
9+
pub(super) fn dispatch_concurrency(context: &ReviewExecutionContext<'_>) -> usize {
10+
if context.services.is_local {
11+
1
12+
} else {
13+
MAX_CONCURRENT_FILES
14+
}
15+
}
16+
17+
pub(super) fn build_agent_loop_config(
18+
context: &ReviewExecutionContext<'_>,
19+
) -> core::agent_loop::AgentLoopConfig {
20+
core::agent_loop::AgentLoopConfig {
21+
max_iterations: context.services.config.agent_max_iterations,
22+
max_total_tokens: context.services.config.agent_max_total_tokens,
23+
}
24+
}
25+
26+
pub(super) fn build_agent_tool_context(
27+
context: &ReviewExecutionContext<'_>,
28+
) -> Option<Arc<core::agent_tools::ReviewToolContext>> {
29+
if !(context.services.config.agent_review && context.services.adapter.supports_tools()) {
30+
return None;
31+
}
32+
33+
let context_fetcher_arc = Arc::new(core::ContextFetcher::new(
34+
context.services.repo_path.clone(),
35+
));
36+
Some(Arc::new(core::agent_tools::ReviewToolContext {
37+
repo_path: context.services.repo_path.clone(),
38+
context_fetcher: context_fetcher_arc,
39+
symbol_index: None,
40+
symbol_graph: None,
41+
git_history: None,
42+
}))
43+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
use std::sync::Arc;
2+
use std::time::Instant;
3+
4+
use crate::adapters;
5+
use crate::core;
6+
7+
use super::super::super::contracts::{FileReviewJob, ReviewExecutionContext};
8+
use super::{AgentActivity, DispatchedJobResult};
9+
10+
pub(super) async fn dispatch_job(
11+
job: FileReviewJob,
12+
context: &ReviewExecutionContext<'_>,
13+
agent_ctx: Option<Arc<core::agent_tools::ReviewToolContext>>,
14+
loop_config: core::agent_loop::AgentLoopConfig,
15+
tools_filter: Option<Vec<String>>,
16+
) -> DispatchedJobResult {
17+
let adapter = context.services.adapter.clone();
18+
19+
if context.services.is_local {
20+
eprintln!("Sending {} to local model...", job.file_path.display());
21+
}
22+
let request_start = Instant::now();
23+
24+
let (response, agent_data) = if let Some(ctx) = agent_ctx {
25+
let tools = core::agent_tools::build_review_tools(ctx, tools_filter.as_deref());
26+
let tool_defs: Vec<_> = tools.iter().map(|tool| tool.definition()).collect();
27+
let chat_request = adapters::llm::ChatRequest::from_llm_request(job.request, &tool_defs);
28+
match core::agent_loop::run_agent_loop(
29+
adapter.as_ref(),
30+
chat_request,
31+
&tools,
32+
&loop_config,
33+
None,
34+
)
35+
.await
36+
{
37+
Ok(result) => {
38+
let activity = AgentActivity {
39+
total_iterations: result.iterations,
40+
tool_calls: result.tool_calls,
41+
};
42+
(
43+
Ok(adapters::llm::LLMResponse {
44+
content: result.content,
45+
model: result.model,
46+
usage: Some(result.total_usage),
47+
}),
48+
Some(activity),
49+
)
50+
}
51+
Err(error) => (Err(error), None),
52+
}
53+
} else {
54+
(adapter.complete(job.request).await, None)
55+
};
56+
57+
let latency_ms = request_start.elapsed().as_millis() as u64;
58+
if context.services.is_local {
59+
eprintln!(
60+
"{}: response received ({:.1}s)",
61+
job.file_path.display(),
62+
latency_ms as f64 / 1000.0
63+
);
64+
}
65+
66+
DispatchedJobResult {
67+
job_order: job.job_order,
68+
diff_index: job.diff_index,
69+
active_rules: job.active_rules,
70+
path_config: job.path_config,
71+
file_path: job.file_path,
72+
deterministic_comments: job.deterministic_comments,
73+
pass_kind: job.pass_kind,
74+
mark_file_complete: job.mark_file_complete,
75+
response,
76+
latency_ms,
77+
agent_data,
78+
}
79+
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
use futures::StreamExt;
2+
3+
use super::super::super::contracts::{FileReviewJob, ReviewExecutionContext};
4+
use super::context::{build_agent_loop_config, build_agent_tool_context, dispatch_concurrency};
5+
use super::job::dispatch_job;
6+
use super::DispatchedJobResult;
7+
8+
pub(in super::super) async fn dispatch_jobs(
9+
jobs: Vec<FileReviewJob>,
10+
context: &ReviewExecutionContext<'_>,
11+
) -> Vec<DispatchedJobResult> {
12+
let concurrency = dispatch_concurrency(context);
13+
tracing::info!(
14+
"Sending {} LLM requests (concurrency={})",
15+
jobs.len(),
16+
concurrency,
17+
);
18+
19+
let agent_tool_ctx = build_agent_tool_context(context);
20+
let agent_loop_config = build_agent_loop_config(context);
21+
let agent_tools_filter = context.services.config.agent_tools_enabled.clone();
22+
23+
futures::stream::iter(jobs)
24+
.map(|job| {
25+
let agent_ctx = agent_tool_ctx.clone();
26+
let loop_config = agent_loop_config.clone();
27+
let tools_filter = agent_tools_filter.clone();
28+
async move { dispatch_job(job, context, agent_ctx, loop_config, tools_filter).await }
29+
})
30+
.buffer_unordered(concurrency)
31+
.collect()
32+
.await
33+
}

0 commit comments

Comments
 (0)