Skip to content

Commit 19246ce

Browse files
committed
Add duplicate run prevention and improve database schema
- Replace Run table index with unique constraint on (commit_sha, binary_id, environment_id) - Add IntegrityError handling in upload endpoint to prevent duplicate runs - Remove redundant database indexes to improve performance - Enhance init_db.py with better error handling and database URL override support - Improve PostgreSQL compatibility with proper table drop ordering - Add flamegraph cleanup functionality to manage storage efficiently
1 parent 8c186e0 commit 19246ce

File tree

3 files changed

+176
-16
lines changed

3 files changed

+176
-16
lines changed

backend/app/models.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
JSON,
99
Index,
1010
Boolean,
11+
UniqueConstraint,
1112
)
1213
from sqlalchemy.ext.declarative import declarative_base
1314
from sqlalchemy.orm import relationship
@@ -88,8 +89,9 @@ class Run(Base):
8889
benchmark_results = relationship("BenchmarkResult", back_populates="run")
8990

9091
__table_args__ = (
91-
Index(
92-
"idx_runs_commit_binary_env", "commit_sha", "binary_id", "environment_id"
92+
UniqueConstraint(
93+
"commit_sha", "binary_id", "environment_id",
94+
name="unique_commit_binary_env"
9395
),
9496
Index("idx_runs_timestamp", "timestamp"),
9597
Index(
@@ -101,20 +103,13 @@ class Run(Base):
101103
Index(
102104
"idx_runs_env_timestamp", "environment_id", "timestamp"
103105
), # For environment-based queries
104-
Index(
105-
"idx_runs_timestamp_desc",
106-
"timestamp",
107-
postgresql_using="btree",
108-
mysql_length={"timestamp": 255},
109-
), # For ORDER BY timestamp DESC
110106
Index(
111107
"idx_runs_env_python_timestamp",
112108
"environment_id",
113109
"python_major",
114110
"python_minor",
115111
"timestamp",
116112
), # For filtered queries
117-
Index("idx_runs_binary_env_commit", "binary_id", "environment_id", "commit_sha"),
118113
)
119114

120115

backend/app/routers/upload.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
from fastapi import APIRouter, Depends, HTTPException
44
from sqlalchemy.ext.asyncio import AsyncSession
5+
from sqlalchemy import select, delete, desc, func, update, and_
6+
from sqlalchemy.exc import IntegrityError
57
from datetime import datetime
68
import logging
79

@@ -12,6 +14,81 @@
1214
router = APIRouter(prefix="/api", tags=["upload"])
1315

1416

17+
async def cleanup_old_flamegraphs_if_needed(db: AsyncSession, environment_id: str, max_flamegraphs: int = 100):
18+
"""
19+
Set flamegraph_html to NULL for runs older than the last max_flamegraphs in this environment.
20+
"""
21+
logger = logging.getLogger(__name__)
22+
23+
keep_runs_result = await db.execute(
24+
select(models.Run.run_id).where(
25+
models.Run.environment_id == environment_id
26+
).order_by(desc(models.Run.timestamp)).limit(max_flamegraphs)
27+
)
28+
keep_run_ids = [row[0] for row in keep_runs_result.fetchall()]
29+
30+
if not keep_run_ids:
31+
return 0
32+
33+
total_flamegraphs_result = await db.execute(
34+
select(func.count()).select_from(models.BenchmarkResult).join(models.Run).where(
35+
models.Run.environment_id == environment_id,
36+
models.BenchmarkResult.flamegraph_html.is_not(None)
37+
)
38+
)
39+
total_flamegraphs_before = total_flamegraphs_result.scalar()
40+
41+
count_query = select(func.count()).select_from(models.BenchmarkResult).where(
42+
and_(
43+
models.BenchmarkResult.run_id.not_in(keep_run_ids),
44+
models.BenchmarkResult.flamegraph_html.is_not(None)
45+
)
46+
)
47+
count_result = await db.execute(count_query)
48+
rows_to_clean = count_result.scalar()
49+
50+
if rows_to_clean == 0:
51+
logger.info(f"No flamegraphs need cleaning for environment '{environment_id}' (all {total_flamegraphs_before} flamegraphs are from recent runs)")
52+
return 0
53+
54+
cleanup_query = update(models.BenchmarkResult).where(
55+
and_(
56+
models.BenchmarkResult.run_id.not_in(keep_run_ids),
57+
models.BenchmarkResult.flamegraph_html.is_not(None)
58+
)
59+
).values(flamegraph_html=None)
60+
61+
try:
62+
result = await db.execute(cleanup_query)
63+
await db.commit()
64+
65+
verify_result = await db.execute(count_query)
66+
remaining = verify_result.scalar()
67+
actual_cleaned = rows_to_clean - remaining
68+
69+
final_flamegraphs_result = await db.execute(
70+
select(func.count()).select_from(models.BenchmarkResult).join(models.Run).where(
71+
models.Run.environment_id == environment_id,
72+
models.BenchmarkResult.flamegraph_html.is_not(None)
73+
)
74+
)
75+
total_flamegraphs_after = final_flamegraphs_result.scalar()
76+
77+
if actual_cleaned > 0:
78+
logger.info(f"Cleaned up {actual_cleaned} flamegraphs for environment '{environment_id}': {total_flamegraphs_before}{total_flamegraphs_after} flamegraphs remaining")
79+
else:
80+
logger.error(f"Cleanup FAILED for environment '{environment_id}'. Expected to clean {rows_to_clean}, but {remaining} still remain")
81+
82+
return actual_cleaned
83+
84+
except Exception as e:
85+
logger.error(f"Exception during cleanup for environment '{environment_id}': {e}")
86+
await db.rollback()
87+
raise
88+
89+
90+
91+
1592
@router.post("/upload-run", response_model=dict)
1693
async def upload_worker_run(
1794
upload_data: schemas.WorkerRunUpload,
@@ -106,6 +183,8 @@ async def upload_worker_run(
106183
)
107184
logger.info(f"Configure flags validation passed for binary '{binary_id}'")
108185

186+
# Note: Duplicate commits are now prevented by database unique constraint on (commit_sha, binary_id, environment_id)
187+
109188
# Create or get commit
110189
logger.debug(f"Looking up commit {commit_sha[:8]} in database")
111190
commit = await crud.get_commit_by_sha(db, sha=commit_sha)
@@ -203,6 +282,9 @@ async def upload_worker_run(
203282
f"Upload completed successfully: run_id={run_id}, created {len(created_results)} benchmark results"
204283
)
205284

285+
# Clean up old flamegraphs if we have more than 100 runs for this environment
286+
await cleanup_old_flamegraphs_if_needed(db, environment_id, max_flamegraphs=100)
287+
206288
return {
207289
"message": "Worker run uploaded successfully",
208290
"run_id": run_id,
@@ -213,6 +295,17 @@ async def upload_worker_run(
213295
"result_ids": created_results,
214296
}
215297

298+
except IntegrityError as e:
299+
# Handle unique constraint violation for duplicate commit+binary+environment
300+
if "unique_commit_binary_env" in str(e).lower():
301+
logger.error(f"Upload failed: Duplicate run for commit {commit_sha[:8]}, binary '{binary_id}', environment '{environment_id}'")
302+
raise HTTPException(
303+
status_code=409,
304+
detail=f"A run already exists for commit {commit_sha[:8]} with binary '{binary_id}' and environment '{environment_id}'. Duplicate uploads are not allowed."
305+
)
306+
else:
307+
logger.error(f"Database integrity error during upload: {e}")
308+
raise HTTPException(status_code=500, detail=f"Database integrity error: {str(e)}")
216309
except HTTPException:
217310
# Re-raise HTTP exceptions (validation errors) as-is
218311
raise

backend/scripts/init_db.py

Lines changed: 79 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,21 +55,36 @@ async def init_admin_data():
5555
async def init_database():
5656
"""Initialize the database by creating all tables."""
5757
print("Initializing database...")
58+
59+
settings = get_settings()
5860

5961
try:
6062
# Import models to ensure they're registered
61-
from app.models import AdminUser, AdminSession, AuthToken, Commit, Binary, Environment, Run, BenchmarkResult
63+
from app.models import AdminUser, AdminSession, AuthToken, Commit, Binary, Environment, Run, BenchmarkResult, Base
64+
from app.database import create_database_engine
65+
66+
# Create a fresh engine with current settings
67+
engine = create_database_engine()
68+
69+
print(f"Connected to: {settings.database_url.split('@')[-1] if '@' in settings.database_url else settings.database_url}")
6270

63-
await create_tables()
71+
# Create all tables
72+
async with engine.begin() as conn:
73+
await conn.run_sync(Base.metadata.create_all)
6474
print("✅ Database tables created successfully!")
6575

76+
# Close the engine we created
77+
await engine.dispose()
78+
6679
# Initialize admin data
6780
admin_success = await init_admin_data()
6881
if not admin_success:
6982
return False
7083

7184
except Exception as e:
7285
print(f"❌ Error creating database tables: {e}")
86+
import traceback
87+
traceback.print_exc()
7388
return False
7489

7590
return True
@@ -78,24 +93,62 @@ async def init_database():
7893
async def reset_database():
7994
"""Reset the database by dropping and recreating all tables."""
8095
print("Resetting database...")
96+
97+
settings = get_settings()
98+
is_postgres = "postgresql" in settings.database_url
8199

82100
try:
83101
# Import models to ensure they're registered
84-
from app.models import AdminUser, AdminSession, AuthToken, Commit, Binary, Environment, Run, BenchmarkResult
102+
from app.models import AdminUser, AdminSession, AuthToken, Commit, Binary, Environment, Run, BenchmarkResult, Base
103+
from app.database import create_database_engine
104+
from sqlalchemy.ext.asyncio import create_async_engine
85105

86-
await drop_tables()
87-
print("🗑️ Existing tables dropped")
88-
89-
await create_tables()
106+
# Create a fresh engine with current settings
107+
engine = create_database_engine()
108+
109+
print(f"Connected to: {settings.database_url.split('@')[-1] if '@' in settings.database_url else settings.database_url}")
110+
111+
# For PostgreSQL, we need to handle foreign key constraints more carefully
112+
if is_postgres:
113+
async with engine.begin() as conn:
114+
# First, drop all dependent tables (those with foreign keys)
115+
print("🗑️ Dropping dependent tables...")
116+
await conn.run_sync(BenchmarkResult.__table__.drop, checkfirst=True)
117+
await conn.run_sync(Run.__table__.drop, checkfirst=True)
118+
await conn.run_sync(AdminSession.__table__.drop, checkfirst=True)
119+
120+
# Then drop the referenced tables
121+
print("🗑️ Dropping referenced tables...")
122+
await conn.run_sync(Commit.__table__.drop, checkfirst=True)
123+
await conn.run_sync(Binary.__table__.drop, checkfirst=True)
124+
await conn.run_sync(Environment.__table__.drop, checkfirst=True)
125+
await conn.run_sync(AuthToken.__table__.drop, checkfirst=True)
126+
await conn.run_sync(AdminUser.__table__.drop, checkfirst=True)
127+
128+
print("🗑️ All tables dropped")
129+
else:
130+
# For SQLite, drop all tables using metadata
131+
async with engine.begin() as conn:
132+
await conn.run_sync(Base.metadata.drop_all)
133+
print("🗑️ Existing tables dropped")
134+
135+
# Recreate all tables
136+
async with engine.begin() as conn:
137+
await conn.run_sync(Base.metadata.create_all)
90138
print("✅ Database tables recreated successfully!")
91139

140+
# Close the engine we created
141+
await engine.dispose()
142+
92143
# Initialize admin data after reset
93144
admin_success = await init_admin_data()
94145
if not admin_success:
95146
return False
96147

97148
except Exception as e:
98149
print(f"❌ Error resetting database: {e}")
150+
import traceback
151+
traceback.print_exc()
99152
return False
100153

101154
return True
@@ -110,9 +163,28 @@ async def reset_database():
110163
action="store_true",
111164
help="Reset the database (drop and recreate tables)",
112165
)
166+
parser.add_argument(
167+
"--database-url",
168+
type=str,
169+
help="Database URL to use (overrides DATABASE_URL env var)",
170+
)
113171

114172
args = parser.parse_args()
115173

174+
# Override database URL if provided
175+
if args.database_url:
176+
os.environ["DATABASE_URL"] = args.database_url
177+
# Also try the lowercase version
178+
os.environ["database_url"] = args.database_url
179+
# Clear the settings cache so it picks up the new DATABASE_URL
180+
from app.config import get_settings
181+
get_settings.cache_clear()
182+
183+
# Verify the database URL is actually being used
184+
settings = get_settings()
185+
print(f"Using database URL: {args.database_url.split('@')[-1] if '@' in args.database_url else args.database_url}")
186+
print(f"Settings database URL: {settings.database_url.split('@')[-1] if '@' in settings.database_url else settings.database_url}")
187+
116188
if args.reset:
117189
success = asyncio.run(reset_database())
118190
else:

0 commit comments

Comments
 (0)