1111from fastapi .responses import StreamingResponse
1212from structlog .stdlib import BoundLogger
1313
14- from ... import crypto
14+ from ... import concurrency , crypto
15+ from ...concurrency import MAX_BUFFER_SIZE
1516from ...errors import S3Error
1617from ...s3client import S3Client , S3Credentials
1718from ...state import (
@@ -147,37 +148,51 @@ async def _decrypt_single_object(
147148 ) -> Response :
148149 logger .info ("GET_ENCRYPTED_SINGLE" , bucket = bucket , key = key )
149150 resp = await client .get_object (bucket , key )
150- wrapped_dek = base64 .b64decode (wrapped_dek_b64 )
151- # Read and close body to release aioboto3/aiohttp resources
152- async with resp ["Body" ] as body :
153- ciphertext = await body .read ()
154- plaintext = crypto .decrypt_object (ciphertext , wrapped_dek , self .settings .kek )
155- del ciphertext # Free memory
151+ content_length = resp .get ("ContentLength" , 0 )
156152
157- content_type = head_resp .get ("ContentType" , "application/octet-stream" )
158- cache_control = head_resp .get ("CacheControl" )
159- expires = head_resp .get ("Expires" )
153+ # Encrypted decrypts buffer ciphertext + plaintext simultaneously.
154+ # Acquire additional memory beyond the initial MAX_BUFFER_SIZE reservation.
155+ additional = max (0 , content_length * 2 - MAX_BUFFER_SIZE )
156+ extra_reserved = 0
157+ try :
158+ if additional > 0 :
159+ extra_reserved = await concurrency .try_acquire_memory (additional )
160+
161+ wrapped_dek = base64 .b64decode (wrapped_dek_b64 )
162+ async with resp ["Body" ] as body :
163+ ciphertext = await body .read ()
164+ plaintext = crypto .decrypt_object (ciphertext , wrapped_dek , self .settings .kek )
165+ del ciphertext
166+
167+ content_type = head_resp .get ("ContentType" , "application/octet-stream" )
168+ cache_control = head_resp .get ("CacheControl" )
169+ expires = head_resp .get ("Expires" )
170+
171+ if range_header :
172+ start , end = self ._parse_range (range_header , len (plaintext ))
173+ headers = self ._build_headers (
174+ content_type = content_type ,
175+ content_length = end - start + 1 ,
176+ last_modified = last_modified ,
177+ cache_control = cache_control ,
178+ expires = expires ,
179+ )
180+ headers ["Content-Range" ] = f"bytes { start } -{ end } /{ len (plaintext )} "
181+ return Response (
182+ content = plaintext [start : end + 1 ], status_code = 206 , headers = headers
183+ )
160184
161- if range_header :
162- start , end = self ._parse_range (range_header , len (plaintext ))
163185 headers = self ._build_headers (
164186 content_type = content_type ,
165- content_length = end - start + 1 ,
187+ content_length = len ( plaintext ) ,
166188 last_modified = last_modified ,
167189 cache_control = cache_control ,
168190 expires = expires ,
169191 )
170- headers ["Content-Range" ] = f"bytes { start } -{ end } /{ len (plaintext )} "
171- return Response (content = plaintext [start : end + 1 ], status_code = 206 , headers = headers )
172-
173- headers = self ._build_headers (
174- content_type = content_type ,
175- content_length = len (plaintext ),
176- last_modified = last_modified ,
177- cache_control = cache_control ,
178- expires = expires ,
179- )
180- return Response (content = plaintext , headers = headers )
192+ return Response (content = plaintext , headers = headers )
193+ finally :
194+ if extra_reserved > 0 :
195+ await concurrency .release_memory (extra_reserved )
181196
182197 async def _get_multipart (
183198 self ,
@@ -378,13 +393,17 @@ async def _fetch_internal_part(
378393 ct_end : int ,
379394 dek : bytes ,
380395 ) -> bytes :
396+ expected_size = ct_end - ct_start + 1
397+ additional = max (0 , expected_size * 2 - MAX_BUFFER_SIZE )
398+ extra_reserved = 0
381399 try :
400+ if additional > 0 :
401+ extra_reserved = await concurrency .try_acquire_memory (additional )
402+
382403 resp = await client .get_object (bucket , key , f"bytes={ ct_start } -{ ct_end } " )
383- # Read and close body to release aioboto3/aiohttp resources
384404 async with resp ["Body" ] as body :
385405 ciphertext = await body .read ()
386406
387- expected_size = ct_end - ct_start + 1
388407 if len (ciphertext ) < crypto .ENCRYPTION_OVERHEAD or len (ciphertext ) != expected_size :
389408 logger .error (
390409 "GET_CIPHERTEXT_SIZE_MISMATCH" ,
@@ -419,6 +438,9 @@ async def _fetch_internal_part(
419438 f"range { ct_start } -{ ct_end } invalid"
420439 ) from e
421440 raise
441+ finally :
442+ if extra_reserved > 0 :
443+ await concurrency .release_memory (extra_reserved )
422444
423445 async def _fetch_and_decrypt_part (
424446 self ,
@@ -445,12 +467,21 @@ async def _fetch_and_decrypt_part(
445467
446468 self ._validate_ciphertext_range (bucket , key , part_num , 0 , ct_end , actual_size )
447469
448- resp = await client .get_object (bucket , key , f"bytes={ ct_start } -{ ct_end } " )
449- # Read and close body to release aioboto3/aiohttp resources
450- async with resp ["Body" ] as body :
451- ciphertext = await body .read ()
452- decrypted = crypto .decrypt (ciphertext , dek )
453- return decrypted [off_start : off_end + 1 ]
470+ part_size = part_meta .ciphertext_size
471+ additional = max (0 , part_size * 2 - MAX_BUFFER_SIZE )
472+ extra_reserved = 0
473+ try :
474+ if additional > 0 :
475+ extra_reserved = await concurrency .try_acquire_memory (additional )
476+
477+ resp = await client .get_object (bucket , key , f"bytes={ ct_start } -{ ct_end } " )
478+ async with resp ["Body" ] as body :
479+ ciphertext = await body .read ()
480+ decrypted = crypto .decrypt (ciphertext , dek )
481+ return decrypted [off_start : off_end + 1 ]
482+ finally :
483+ if extra_reserved > 0 :
484+ await concurrency .release_memory (extra_reserved )
454485
455486 def _build_response_headers (self , resp : dict , last_modified : str | None ) -> dict [str , str ]:
456487 return self ._build_headers (
0 commit comments