-
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathauth.py
More file actions
62 lines (50 loc) · 2.05 KB
/
auth.py
File metadata and controls
62 lines (50 loc) · 2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Authentication utilities for the Memory Tracker API."""
from fastapi import Depends, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from . import models, crud
from .database import get_database
from .logging_config import get_logger, user_id_var
from .exceptions import authentication_failed
# Security scheme
security = HTTPBearer(auto_error=False)
async def get_current_token(
authorization: Annotated[str, Header()] = None,
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_database),
) -> models.AuthToken:
"""
Extract and validate auth token from Authorization header.
Supports both 'Bearer <token>' and 'Token <token>' formats.
"""
logger = get_logger("api.auth")
token = None
# Try to extract token from Authorization header
if authorization:
if authorization.startswith("Bearer "):
token = authorization[7:]
elif authorization.startswith("Token "):
token = authorization[6:]
else:
token = authorization
elif credentials:
token = credentials.credentials
if not token:
logger.warning("Authentication failed: Missing token")
raise authentication_failed("Missing authentication token")
# Look up token in database
auth_token = await crud.get_auth_token_by_token(db, token)
if not auth_token:
# Only log token length and first/last 4 characters for security
masked_token = f"{token[:4]}...{token[-4:]}" if len(token) > 8 else "***"
logger.warning(
f"Authentication failed: Invalid token {masked_token} (length: {len(token)})"
)
raise authentication_failed("Invalid or expired token")
# Set user context for logging
user_id_var.set(auth_token.name)
# Update last used timestamp
await crud.update_token_last_used(db, token)
logger.info(f"Authentication successful for token: {auth_token.name}")
return auth_token