Skip to content

Commit 4f45c41

Browse files
committed
PR feedback
1 parent 1b4698f commit 4f45c41

File tree

2 files changed

+77
-8
lines changed

2 files changed

+77
-8
lines changed

durabletask/extensions/azure_blob_payloads/blob_payload_store.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import uuid
1111
from typing import Optional
1212

13+
from azure.core.exceptions import ResourceExistsError
1314
from azure.storage.blob import BlobServiceClient
1415
from azure.storage.blob.aio import BlobServiceClient as AsyncBlobServiceClient
1516

@@ -82,6 +83,30 @@ def __init__(self, options: BlobPayloadStoreOptions):
8283

8384
self._ensure_container_created = False
8485

86+
# ------------------------------------------------------------------
87+
# Lifecycle / resource management
88+
# ------------------------------------------------------------------
89+
90+
def close(self) -> None:
91+
"""Close the underlying sync blob service client."""
92+
self._blob_service_client.close()
93+
94+
async def close_async(self) -> None:
95+
"""Close the underlying async blob service client."""
96+
await self._async_blob_service_client.close()
97+
98+
def __enter__(self) -> BlobPayloadStore:
99+
return self
100+
101+
def __exit__(self, *args: object) -> None:
102+
self.close()
103+
104+
async def __aenter__(self) -> BlobPayloadStore:
105+
return self
106+
107+
async def __aexit__(self, *args: object) -> None:
108+
await self.close_async()
109+
85110
@property
86111
def options(self) -> BlobPayloadStoreOptions:
87112
return self._options
@@ -185,8 +210,7 @@ def _ensure_container_sync(self) -> None:
185210
container_client = self._blob_service_client.get_container_client(self._container_name)
186211
try:
187212
container_client.create_container()
188-
except Exception:
189-
# Container may already exist — that is fine.
213+
except ResourceExistsError:
190214
pass
191215
self._ensure_container_created = True
192216

@@ -196,7 +220,6 @@ async def _ensure_container_async(self) -> None:
196220
container_client = self._async_blob_service_client.get_container_client(self._container_name)
197221
try:
198222
await container_client.create_container()
199-
except Exception:
200-
# Container may already exist — that is fine.
223+
except ResourceExistsError:
201224
pass
202225
self._ensure_container_created = True

durabletask/payload/helpers.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ async def deexternalize_payloads_async(
8484
# Internal recursive walkers – sync
8585
# ------------------------------------------------------------------
8686

87+
def _is_map_field(fd) -> bool:
88+
"""Return True if the field descriptor represents a protobuf map field."""
89+
mt = fd.message_type
90+
return mt is not None and fd.is_repeated and mt.GetOptions().map_entry
91+
92+
8793
def _walk_and_externalize(
8894
msg: proto_message.Message,
8995
store: PayloadStore,
@@ -95,7 +101,21 @@ def _walk_and_externalize(
95101
if fd.message_type is None:
96102
continue
97103

98-
if fd.is_repeated:
104+
if _is_map_field(fd):
105+
# Map fields: iterate values. ScalarMap values are not
106+
# messages and will be skipped by the isinstance check.
107+
for map_value in getattr(msg, fd.name).values():
108+
if isinstance(map_value, proto_message.Message):
109+
if isinstance(map_value, wrappers_pb2.StringValue):
110+
_try_externalize_field(
111+
fd.name, map_value, store,
112+
threshold, max_bytes, instance_id,
113+
)
114+
else:
115+
_walk_and_externalize(
116+
map_value, store, threshold, max_bytes, instance_id
117+
)
118+
elif fd.is_repeated:
99119
value = getattr(msg, fd.name)
100120
for item in value:
101121
if isinstance(item, proto_message.Message):
@@ -162,7 +182,14 @@ def _walk_and_deexternalize(
162182
if fd.message_type is None:
163183
continue
164184

165-
if fd.is_repeated:
185+
if _is_map_field(fd):
186+
for map_value in getattr(msg, fd.name).values():
187+
if isinstance(map_value, proto_message.Message):
188+
if isinstance(map_value, wrappers_pb2.StringValue):
189+
_try_deexternalize_field(map_value, store)
190+
else:
191+
_walk_and_deexternalize(map_value, store)
192+
elif fd.is_repeated:
166193
value = getattr(msg, fd.name)
167194
for item in value:
168195
if isinstance(item, proto_message.Message):
@@ -207,7 +234,19 @@ async def _walk_and_externalize_async(
207234
if fd.message_type is None:
208235
continue
209236

210-
if fd.is_repeated:
237+
if _is_map_field(fd):
238+
for map_value in getattr(msg, fd.name).values():
239+
if isinstance(map_value, proto_message.Message):
240+
if isinstance(map_value, wrappers_pb2.StringValue):
241+
await _try_externalize_field_async(
242+
fd.name, map_value, store,
243+
threshold, max_bytes, instance_id,
244+
)
245+
else:
246+
await _walk_and_externalize_async(
247+
map_value, store, threshold, max_bytes, instance_id,
248+
)
249+
elif fd.is_repeated:
211250
value = getattr(msg, fd.name)
212251
for item in value:
213252
if isinstance(item, proto_message.Message):
@@ -273,7 +312,14 @@ async def _walk_and_deexternalize_async(
273312
if fd.message_type is None:
274313
continue
275314

276-
if fd.is_repeated:
315+
if _is_map_field(fd):
316+
for map_value in getattr(msg, fd.name).values():
317+
if isinstance(map_value, proto_message.Message):
318+
if isinstance(map_value, wrappers_pb2.StringValue):
319+
await _try_deexternalize_field_async(map_value, store)
320+
else:
321+
await _walk_and_deexternalize_async(map_value, store)
322+
elif fd.is_repeated:
277323
value = getattr(msg, fd.name)
278324
for item in value:
279325
if isinstance(item, proto_message.Message):

0 commit comments

Comments
 (0)