-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstorage.py
More file actions
114 lines (97 loc) · 3.8 KB
/
storage.py
File metadata and controls
114 lines (97 loc) · 3.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import os
import boto3
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
# S3 env
DOCUMENT_BUCKET_ENDPOINT = os.getenv("DOCUMENT_BUCKET_ENDPOINT")
DOCUMENT_BUCKET_ACCESS_KEY_ID = os.getenv("DOCUMENT_BUCKET_ACCESS_KEY_ID")
DOCUMENT_BUCKET_SECRET_ACCESS_KEY = os.getenv("DOCUMENT_BUCKET_SECRET_ACCESS_KEY")
DOCUMENT_BUCKET_REGION = os.getenv("DOCUMENT_BUCKET_REGION")
DOCUMENT_BUCKET = os.getenv("DOCUMENT_BUCKET")
# Local env
DOCUMENT_LOCAL_DIR = os.getenv("DOCUMENT_LOCAL_DIR")
use_s3 = bool(DOCUMENT_BUCKET and DOCUMENT_BUCKET_ACCESS_KEY_ID and DOCUMENT_BUCKET_SECRET_ACCESS_KEY)
s3_client = None
upload_dir: Path | None = None
# Returns the lazy-initialized S3 client singleton when S3 is configured; otherwise None.
def get_s3_client():
global s3_client
if s3_client is None and use_s3:
s3_client = boto3.client(
"s3",
endpoint_url=DOCUMENT_BUCKET_ENDPOINT or None,
aws_access_key_id=DOCUMENT_BUCKET_ACCESS_KEY_ID,
aws_secret_access_key=DOCUMENT_BUCKET_SECRET_ACCESS_KEY,
region_name=DOCUMENT_BUCKET_REGION or "us-east-1",
)
return s3_client
# Returns the local upload directory path; creates it if missing. Used when not using S3.
def get_upload_dir() -> Path:
global upload_dir
if upload_dir is None:
upload_dir = Path(DOCUMENT_LOCAL_DIR)
upload_dir.mkdir(parents=True, exist_ok=True)
return upload_dir
# Returns True if storage is configured to use S3; False for local disk.
def is_s3() -> bool:
return use_s3
# Saves content to S3 (by key) or to local disk under the upload directory.
def save_file(key: str, content: bytes) -> None:
if use_s3:
client = get_s3_client()
client.put_object(Bucket=DOCUMENT_BUCKET, Key=key, Body=content)
else:
path = get_upload_dir() / key
path.parent.mkdir(parents=True, exist_ok=True)
path.write_bytes(content)
# Returns the local file path for the given key when using local storage; None if S3 or key not found (path traversal safe).
def get_file_path(key: str) -> Path | None:
if use_s3:
return None
path = (get_upload_dir() / key).resolve()
root = get_upload_dir().resolve()
if not path.is_file():
return None
try:
path.relative_to(root)
except ValueError:
return None
return path
# Returns a presigned GET URL for the key in S3, or None when using local storage or object missing. expires_in in seconds.
def get_presigned_url(key: str, expires_in: int = 3600) -> str | None:
if not use_s3:
return None
client = get_s3_client()
try:
client.head_object(Bucket=DOCUMENT_BUCKET, Key=key)
except Exception:
return None
url = client.generate_presigned_url(
"get_object",
Params={"Bucket": DOCUMENT_BUCKET, "Key": key},
ExpiresIn=expires_in,
)
return url
# Returns list of storage keys. When user_id is set, only keys under that prefix/folder.
def list_files(user_id: str | None = None) -> list[str]:
if use_s3:
client = get_s3_client()
out: list[str] = []
prefix = f"{user_id}/" if user_id else ""
paginator = client.get_paginator("list_objects_v2")
for page in paginator.paginate(Bucket=DOCUMENT_BUCKET, Prefix=prefix):
for obj in page.get("Contents") or []:
key = obj.get("Key")
if key:
out.append(key)
return sorted(out, key=str.lower)
root = get_upload_dir()
if user_id:
dir_path = root / user_id
if not dir_path.is_dir():
return []
out = [f"{user_id}/{p.name}" for p in dir_path.iterdir() if p.is_file()]
return sorted(out, key=str.lower)
out = [p.name for p in root.iterdir() if p.is_file()]
return sorted(out, key=str.lower)