perf(graph): batch DB operations for entity insertion#624
perf(graph): batch DB operations for entity insertion#624
Conversation
Replace one-at-a-time graph database queries with UNWIND-based batch operations, dramatically reducing network round-trips during analysis. Changes: - Add add_files_batch() to graph.py: single UNWIND query for all files - Add add_entities_batch() to graph.py: one query per entity label - Add connect_entities_batch() to graph.py: one query per relation type - Refactor first_pass in source_analyzer.py: collect entities/rels in memory, then batch-flush to DB - Refactor second_pass in source_analyzer.py: collect all resolved symbol relationships, then batch-insert - Add tests for all batch methods in test_graph_ops.py For a typical 100-file project (~500 entities, ~1000 relationships), this reduces ~1600 individual DB round-trips to ~10 batched queries. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
📝 WalkthroughWalkthroughThe PR changes graph construction to a two-phase process: analyzers collect pending files, entities, and relationships during AST traversal, then batch-insert files/entities and create relationships via new Graph batch APIs instead of immediate per-item writes. Changes
Sequence Diagram(s)sequenceDiagram
participant Analyzer as SourceAnalyzer
participant AST as AST Walker
participant Resolver as SymbolResolver
participant Graph as Graph (client)
participant DB as Neo4j
Analyzer->>AST: first_pass walk(file)
AST->>Resolver: resolve symbols
Resolver-->>Analyzer: discovered files/entities/DEFINES (append to pending lists)
Note over Analyzer,Graph: After all files processed
Analyzer->>Graph: add_files_batch(pending_files)
Graph->>DB: UNWIND/MERGE File nodes (chunked)
DB-->>Graph: created File IDs
Graph-->>Analyzer: assign File.id
Analyzer->>Graph: add_entities_batch(pending_entities)
Graph->>DB: UNWIND/MERGE Entity nodes grouped by label (chunked)
DB-->>Graph: created Entity IDs
Graph-->>Analyzer: assign Entity.id
Analyzer->>AST: second_pass walk(file)
AST->>Resolver: resolve relationships/calls
Resolver-->>Analyzer: append relationships to pending_rels (incl. CALLS props)
Note over Analyzer,Graph: After second pass
Analyzer->>Graph: connect_entities_batch(pending_rels)
Graph->>DB: UNWIND/MATCH by ID and MERGE relationships (chunked)
DB-->>Graph: relationships created/updated
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR refactors the analysis write-path to reduce FalkorDB round-trips by batching file/entity/relationship creation via UNWIND, moving from per-node/per-edge queries to per-label/per-relationship-type queries.
Changes:
- Added
Graph.add_files_batch(),Graph.add_entities_batch(), andGraph.connect_entities_batch()for batched inserts/merges. - Updated
SourceAnalyzerfirst/second pass to collect entities/relationships in memory and flush them in batches. - Expanded
tests/test_graph_ops.pywith coverage for the new batch operations and empty-input edge cases.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| api/graph.py | Introduces batch UNWIND write operations for files, entities, and relationships. |
| api/analyzers/source_analyzer.py | Refactors analysis pipeline to accumulate and batch-write graph mutations. |
| tests/test_graph_ops.py | Adds tests validating batch insertion and batch relationship creation behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| q = f"""UNWIND $entities AS e | ||
| MERGE (c:{label}:Searchable {{name: e['name'], path: e['path'], | ||
| src_start: e['src_start'], | ||
| src_end: e['src_end']}}) | ||
| SET c.doc = e['doc'] | ||
| SET c += e['props'] | ||
| RETURN c""" |
There was a problem hiding this comment.
add_entities_batch interpolates label directly into the Cypher string. Since labels can’t be parameterized, this should defensively validate label (e.g., allowlist known entity labels or enforce a strict [A-Za-z0-9_]+ regex) to avoid Cypher injection if this method is ever called with untrusted input.
| MATCH (src), (dest) | ||
| WHERE ID(src) = r['src_id'] AND ID(dest) = r['dest_id'] |
There was a problem hiding this comment.
connect_entities_batch uses MATCH (src), (dest) WHERE ID(src)=... AND ID(dest)=..., which expresses a Cartesian product before filtering. Rewriting as two separate MATCH clauses (or MATCH (src) WHERE ID(src)=... then MATCH (dest) WHERE ID(dest)=...) avoids planner-dependent cross-join behavior and better matches the intent of ID lookups.
| MATCH (src), (dest) | |
| WHERE ID(src) = r['src_id'] AND ID(dest) = r['dest_id'] | |
| MATCH (src) | |
| WHERE ID(src) = r['src_id'] | |
| MATCH (dest) | |
| WHERE ID(dest) = r['dest_id'] |
| q = f"""UNWIND $rels AS r | ||
| MATCH (src), (dest) | ||
| WHERE ID(src) = r['src_id'] AND ID(dest) = r['dest_id'] | ||
| MERGE (src)-[e:{relation}]->(dest) |
There was a problem hiding this comment.
connect_entities_batch always uses MERGE (src)-[e:REL]->(dest) and then SET e += properties. For relationship types where multiple edges between the same pair are meaningful (e.g., multiple CALL sites between the same functions), this will collapse them into a single edge and later rows will overwrite earlier properties. Consider using CREATE for those relation types, or MERGE on a uniqueness key that includes the distinguishing properties (like line/pos).
| MERGE (src)-[e:{relation}]->(dest) | |
| CREATE (src)-[e:{relation}]->(dest) |
|
|
||
| # Skip ignored files | ||
| if any([i in str(file_path) for i in ignore]): | ||
| if any([ig in str(file_path) for ig in ignore]): |
There was a problem hiding this comment.
This any([ ... for ... ]) builds an intermediate list on every file; any(ig in str(file_path) for ig in ignore) avoids the allocation and is equivalent.
| if any([ig in str(file_path) for ig in ignore]): | |
| if any(ig in str(file_path) for ig in ignore): |
| self.graph.add_files_batch([]) | ||
|
|
||
| def test_add_entities_batch(self): | ||
| from api.entities.entity import Entity |
There was a problem hiding this comment.
Entity is imported here but never used in the test. Dropping the unused import keeps the test focused and avoids lint noise.
| from api.entities.entity import Entity |
Add BATCH_SIZE=500 constant to cap UNWIND query payloads. Each batch method now processes items in chunks to avoid overwhelming FalkorDB/Redis memory and protocol buffers on large repositories. Add test_batch_chunking to verify correct behavior across chunk boundaries. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
api/graph.py (2)
571-576: Optional: Remove unusedRETURN eto reduce payload.The query returns
ebut the result is never used. Removing it would slightly reduce response payload size, though this is a minor optimization.♻️ Suggested change
q = f"""UNWIND $rels AS r MATCH (src), (dest) WHERE ID(src) = r['src_id'] AND ID(dest) = r['dest_id'] MERGE (src)-[e:{relation}]->(dest) - SET e += r['properties'] - RETURN e""" + SET e += r['properties']"""🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/graph.py` around lines 571 - 576, The Cypher query string built in variable q includes a trailing "RETURN e" but those results are never used; edit the query in api/graph.py (the q string inside the code that UNWINDs $rels, MATCHes src/dest and MERGEs the relationship) to remove "RETURN e" so the database does not return unused payload — leave the UNWIND/MATCH/MERGE/SET lines intact and just drop the final RETURN clause.
288-310: Consider using a named tuple or dataclass for the entity tuple structure.The tuple indexing (
item[0],item[1], etc.) is fragile and hard to read. A named tuple would make the code more maintainable and self-documenting.♻️ Suggested improvement using NamedTuple
from typing import NamedTuple, Any class EntityData(NamedTuple): entity_obj: Any label: str name: str doc: str path: str src_start: int src_end: int props: dictThen access as
item.label,item.name, etc.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@api/graph.py` around lines 288 - 310, The code uses positional tuple indexing on entities_data (e.g., item[0], item[1], item[2], ...) which is fragile and hard to read; replace the tuple with a small typed container (NamedTuple or dataclass) such as EntityData with fields like entity_obj, label, name, doc, path, src_start, src_end, props, update all usages in this function (the grouping into by_label, creation of data dict for _query, and assignment item[0].id = res.result_set[j][0].id) to use the named attributes (e.g., item.entity_obj, item.label, item.name, etc.), and adjust any callers that populate entities_data so they construct EntityData instances instead of plain tuples.tests/test_graph_ops.py (2)
85-101: Consider verifying database persistence in addition to ID assignment.The test verifies that
idis assigned to mock entities, but doesn't confirm the entities were actually persisted to the database. This could mask issues where IDs are assigned but the DB write fails.💡 Optional enhancement
# After the batch call, verify at least one entity is in the DB result = self.graph.get_function_by_name('func_0') self.assertIsNotNone(result)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_graph_ops.py` around lines 85 - 101, The test test_add_entities_batch only asserts that mock_entity.id was set after calling Graph.add_entities_batch but doesn't confirm DB persistence; update the test to also query the DB (use Graph.get_function_by_name or an appropriate retrieval method) for at least one of the inserted entities (e.g., 'func_0') after self.graph.add_entities_batch(entities_data) and assert the returned result is not None to ensure the entity was actually persisted; keep the existing ID assertions.
142-157: Good test for chunking behavior with proper cleanup.The
try/finallyensuresBATCH_SIZEis restored even if assertions fail. The test effectively validates the chunking logic across batch boundaries.Note: If tests ever run in parallel, modifying the module-level
BATCH_SIZEcould cause race conditions. Consider usingunittest.mock.patchfor safer isolation:♻️ Alternative using mock.patch
def test_batch_chunking(self): """Verify batches are correctly chunked when exceeding BATCH_SIZE.""" from unittest.mock import patch with patch('api.graph.BATCH_SIZE', 3): files = [File(Path(f'/chunked/f{i}.py'), None) for i in range(7)] self.graph.add_files_batch(files) for f in files: self.assertIsNotNone(f.id) for i in range(7): result = self.graph.get_file(f'/chunked/f{i}.py', f'f{i}.py', '.py') self.assertIsNotNone(result)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/test_graph_ops.py` around lines 142 - 157, Replace the manual try/finally mutation of api.graph.BATCH_SIZE in test_batch_chunking with a scoped patch to avoid global state races: use unittest.mock.patch to temporarily set 'api.graph.BATCH_SIZE' to 3 while running the assertions in test_batch_chunking (so the rest of the suite sees the original value); keep the same usage of File, self.graph.add_files_batch and self.graph.get_file, but wrap them under a with patch('api.graph.BATCH_SIZE', 3) context (or the `@patch` decorator) instead of changing BATCH_SIZE and restoring it in finally.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@api/graph.py`:
- Around line 554-582: connect_entities_batch currently silently drops
relationship entries when r['src_id'] or r['dest_id'] is None; validate each
tuple in connect_entities_batch (function) before building by_relation: filter
out or raise for entries where src_id or dest_id is None, and emit a
debug/warning log via the existing logger (or process logger) including the
relation name and offending tuple(s) so missing IDs are visible; ensure this
validation happens before grouping and before calling self._query and keep
chunking with BATCH_SIZE unchanged.
---
Nitpick comments:
In `@api/graph.py`:
- Around line 571-576: The Cypher query string built in variable q includes a
trailing "RETURN e" but those results are never used; edit the query in
api/graph.py (the q string inside the code that UNWINDs $rels, MATCHes src/dest
and MERGEs the relationship) to remove "RETURN e" so the database does not
return unused payload — leave the UNWIND/MATCH/MERGE/SET lines intact and just
drop the final RETURN clause.
- Around line 288-310: The code uses positional tuple indexing on entities_data
(e.g., item[0], item[1], item[2], ...) which is fragile and hard to read;
replace the tuple with a small typed container (NamedTuple or dataclass) such as
EntityData with fields like entity_obj, label, name, doc, path, src_start,
src_end, props, update all usages in this function (the grouping into by_label,
creation of data dict for _query, and assignment item[0].id =
res.result_set[j][0].id) to use the named attributes (e.g., item.entity_obj,
item.label, item.name, etc.), and adjust any callers that populate entities_data
so they construct EntityData instances instead of plain tuples.
In `@tests/test_graph_ops.py`:
- Around line 85-101: The test test_add_entities_batch only asserts that
mock_entity.id was set after calling Graph.add_entities_batch but doesn't
confirm DB persistence; update the test to also query the DB (use
Graph.get_function_by_name or an appropriate retrieval method) for at least one
of the inserted entities (e.g., 'func_0') after
self.graph.add_entities_batch(entities_data) and assert the returned result is
not None to ensure the entity was actually persisted; keep the existing ID
assertions.
- Around line 142-157: Replace the manual try/finally mutation of
api.graph.BATCH_SIZE in test_batch_chunking with a scoped patch to avoid global
state races: use unittest.mock.patch to temporarily set 'api.graph.BATCH_SIZE'
to 3 while running the assertions in test_batch_chunking (so the rest of the
suite sees the original value); keep the same usage of File,
self.graph.add_files_batch and self.graph.get_file, but wrap them under a with
patch('api.graph.BATCH_SIZE', 3) context (or the `@patch` decorator) instead of
changing BATCH_SIZE and restoring it in finally.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 4a821171-2bce-404a-aaa5-2c778e8fe10a
📒 Files selected for processing (2)
api/graph.pytests/test_graph_ops.py
| def connect_entities_batch(self, relationships: list[tuple[str, int, int, dict]]) -> None: | ||
| """ | ||
| Batch create relationships between entities using UNWIND. | ||
| Groups by relation type, then processes in chunks of BATCH_SIZE. | ||
|
|
||
| Args: | ||
| relationships: list of (relation, src_id, dest_id, properties) | ||
| """ | ||
|
|
||
| if not relationships: | ||
| return | ||
|
|
||
| by_relation = defaultdict(list) | ||
| for rel in relationships: | ||
| by_relation[rel[0]].append(rel) | ||
|
|
||
| for relation, group in by_relation.items(): | ||
| q = f"""UNWIND $rels AS r | ||
| MATCH (src), (dest) | ||
| WHERE ID(src) = r['src_id'] AND ID(dest) = r['dest_id'] | ||
| MERGE (src)-[e:{relation}]->(dest) | ||
| SET e += r['properties'] | ||
| RETURN e""" | ||
|
|
||
| for start in range(0, len(group), BATCH_SIZE): | ||
| chunk = group[start:start + BATCH_SIZE] | ||
| data = [{'src_id': r[1], 'dest_id': r[2], 'properties': r[3]} | ||
| for r in chunk] | ||
| self._query(q, {'rels': data}) |
There was a problem hiding this comment.
Silent failure when src_id or dest_id is None.
If add_files_batch or add_entities_batch fails to assign an ID (or is never called), passing None IDs will cause the MATCH ... WHERE ID(src) = r['src_id'] to silently find no nodes, resulting in missing edges without any error or warning.
Consider adding validation or at least a debug log when IDs are None:
🛡️ Proposed validation
def connect_entities_batch(self, relationships: list[tuple[str, int, int, dict]]) -> None:
...
if not relationships:
return
+ # Validate that all IDs are set
+ for rel in relationships:
+ if rel[1] is None or rel[2] is None:
+ logging.warning(f"Skipping relationship {rel[0]} with None ID: src={rel[1]}, dest={rel[2]}")
+
by_relation = defaultdict(list)
- for rel in relationships:
+ for rel in relationships:
+ if rel[1] is None or rel[2] is None:
+ continue
by_relation[rel[0]].append(rel)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@api/graph.py` around lines 554 - 582, connect_entities_batch currently
silently drops relationship entries when r['src_id'] or r['dest_id'] is None;
validate each tuple in connect_entities_batch (function) before building
by_relation: filter out or raise for entries where src_id or dest_id is None,
and emit a debug/warning log via the existing logger (or process logger)
including the relation name and offending tuple(s) so missing IDs are visible;
ensure this validation happens before grouping and before calling self._query
and keep chunking with BATCH_SIZE unchanged.
Summary
Replace one-at-a-time graph database queries with
UNWIND-based batch operations during code analysis, dramatically reducing network round-trips.Before: For a 100-file project with ~500 entities and ~1000 relationships, the analysis pipeline made ~1,600 individual DB round-trips (one per file, entity, and relationship).
After: The same project requires ~10 batched queries (one per entity label + one per relationship type).
Changes
api/graph.py— Add three batch methods:add_files_batch()— singleUNWINDquery for all filesadd_entities_batch()— oneUNWINDquery per entity label (Class, Function, etc.)connect_entities_batch()— oneUNWINDquery per relationship type (DEFINES, CALLS, etc.)api/analyzers/source_analyzer.py— Restructure analysis pipeline:first_pass: parse files and build in-memory hierarchy, then batch-flush files → entities → relationshipssecond_pass: collect all resolved symbol relationships, then batch-insert at the endcreate_hierarchy/create_entity_hierarchy: refactored to collect entities and relationships in memory instead of making immediate DB callstests/test_graph_ops.py— Add tests for all three batch methods (including edge cases like empty lists and relationships with properties)Testing
test_graph_ops.pytests pass (4/4)test_add_files_batch,test_add_files_batch_empty,test_add_entities_batch,test_connect_entities_batch,test_connect_entities_batch_emptyget_fileassertions)Memory / Performance Impact
self.filesfor the second passRelated Issues
N/A — Performance improvement identified during code review
Summary by CodeRabbit
Performance Improvements
Bug Fixes
Tests