Skip to content

Commit 7bf40c4

Browse files
Extract multipart parsing into _parse_multipart_files helper (#2958)
Refactor five image validators to eliminate duplicate multipart parsing code by extracting it into a reusable helper with @beartype decoration. Also simplify validate_image_is_image by directly accessing the stream from the files dict. Co-authored-by: Claude Haiku 4.5 <noreply@anthropic.com>
1 parent 40bd318 commit 7bf40c4

1 file changed

Lines changed: 41 additions & 39 deletions

File tree

src/mock_vws/_query_validators/image_validators.py

Lines changed: 41 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from beartype import beartype
99
from PIL import Image
10+
from werkzeug.datastructures import FileStorage, MultiDict
1011
from werkzeug.formparser import MultiPartParser
1112

1213
from mock_vws._query_validators.exceptions import (
@@ -19,19 +20,19 @@
1920

2021

2122
@beartype
22-
def validate_image_field_given(
23+
def _parse_multipart_files(
2324
*,
2425
request_headers: Mapping[str, str],
2526
request_body: bytes,
26-
) -> None:
27-
"""Validate that the image field is given.
27+
) -> MultiDict[str, FileStorage]:
28+
"""Parse the multipart body and return the files section.
2829
2930
Args:
3031
request_headers: The headers sent with the request.
3132
request_body: The body of the request.
3233
33-
Raises:
34-
ImageNotGivenError: The image field is not given.
34+
Returns:
35+
The files parsed from the multipart body.
3536
"""
3637
email_message = EmailMessage()
3738
email_message["Content-Type"] = request_headers["Content-Type"]
@@ -42,6 +43,28 @@ def validate_image_field_given(
4243
boundary=boundary.encode(encoding="utf-8"),
4344
content_length=len(request_body),
4445
)
46+
return files
47+
48+
49+
@beartype
50+
def validate_image_field_given(
51+
*,
52+
request_headers: Mapping[str, str],
53+
request_body: bytes,
54+
) -> None:
55+
"""Validate that the image field is given.
56+
57+
Args:
58+
request_headers: The headers sent with the request.
59+
request_body: The body of the request.
60+
61+
Raises:
62+
ImageNotGivenError: The image field is not given.
63+
"""
64+
files = _parse_multipart_files(
65+
request_headers=request_headers,
66+
request_body=request_body,
67+
)
4568
if files.get(key="image") is not None:
4669
return
4770

@@ -64,14 +87,9 @@ def validate_image_file_size(
6487
Raises:
6588
RequestEntityTooLargeError: The image file size is too large.
6689
"""
67-
email_message = EmailMessage()
68-
email_message["Content-Type"] = request_headers["Content-Type"]
69-
boundary = email_message.get_boundary(failobj="")
70-
parser = MultiPartParser()
71-
_, files = parser.parse(
72-
stream=io.BytesIO(initial_bytes=request_body),
73-
boundary=boundary.encode(encoding="utf-8"),
74-
content_length=len(request_body),
90+
files = _parse_multipart_files(
91+
request_headers=request_headers,
92+
request_body=request_body,
7593
)
7694
image_part = files["image"]
7795
image_value = image_part.stream.read()
@@ -105,14 +123,9 @@ def validate_image_dimensions(
105123
BadImageError: The image is given and is not within the maximum width
106124
and height limits.
107125
"""
108-
email_message = EmailMessage()
109-
email_message["Content-Type"] = request_headers["Content-Type"]
110-
boundary = email_message.get_boundary(failobj="")
111-
parser = MultiPartParser()
112-
_, files = parser.parse(
113-
stream=io.BytesIO(initial_bytes=request_body),
114-
boundary=boundary.encode(encoding="utf-8"),
115-
content_length=len(request_body),
126+
files = _parse_multipart_files(
127+
request_headers=request_headers,
128+
request_body=request_body,
116129
)
117130
image_part = files["image"]
118131
image_value = image_part.stream.read()
@@ -142,14 +155,9 @@ def validate_image_format(
142155
Raises:
143156
BadImageError: The image is given and is not either a PNG or a JPEG.
144157
"""
145-
email_message = EmailMessage()
146-
email_message["Content-Type"] = request_headers["Content-Type"]
147-
boundary = email_message.get_boundary(failobj="")
148-
parser = MultiPartParser()
149-
_, files = parser.parse(
150-
stream=io.BytesIO(initial_bytes=request_body),
151-
boundary=boundary.encode(encoding="utf-8"),
152-
content_length=len(request_body),
158+
files = _parse_multipart_files(
159+
request_headers=request_headers,
160+
request_body=request_body,
153161
)
154162
image_part = files["image"]
155163
pil_image = Image.open(fp=image_part.stream)
@@ -175,17 +183,11 @@ def validate_image_is_image(
175183
Raises:
176184
BadImageError: Image data is given and it is not an image file.
177185
"""
178-
email_message = EmailMessage()
179-
email_message["Content-Type"] = request_headers["Content-Type"]
180-
boundary = email_message.get_boundary(failobj="")
181-
parser = MultiPartParser()
182-
_, files = parser.parse(
183-
stream=io.BytesIO(initial_bytes=request_body),
184-
boundary=boundary.encode(encoding="utf-8"),
185-
content_length=len(request_body),
186+
files = _parse_multipart_files(
187+
request_headers=request_headers,
188+
request_body=request_body,
186189
)
187-
image_part = files["image"]
188-
image_file = image_part.stream
190+
image_file = files["image"].stream
189191

190192
try:
191193
Image.open(fp=image_file)

0 commit comments

Comments
 (0)