|
1 | | -# based on |
2 | | -# https://github.com/microsoft/botbuilder-python/blob/main/libraries/botbuilder-azure/botbuilder/azure/blob_storage.py |
3 | | - |
4 | | -from typing import TypeVar |
5 | | -from io import BytesIO |
6 | 1 | import json |
| 2 | +from typing import TypeVar, Union |
| 3 | +from io import BytesIO |
7 | 4 |
|
8 | | -from azure.core.exceptions import ( |
9 | | - HttpResponseError, |
10 | | - ResourceExistsError, |
11 | | - ResourceNotFoundError, |
12 | | -) |
13 | 5 | from azure.storage.blob.aio import ( |
14 | 6 | ContainerClient, |
15 | 7 | BlobServiceClient, |
16 | 8 | ) |
17 | 9 |
|
| 10 | +from microsoft.agents.storage import StoreItem |
| 11 | +from microsoft.agents.storage.storage import AsyncStorageBase |
18 | 12 | from microsoft.agents.storage._type_aliases import JSON |
19 | | -from microsoft.agents.storage import Storage, StoreItem |
| 13 | +from microsoft.agents.storage.error_handling import ignore_error, is_status_code_error |
20 | 14 |
|
21 | | -StoreItemT = TypeVar("StoreItemT", bound=StoreItem) |
22 | | - |
23 | | - |
24 | | -class BlobStorageSettings: |
25 | | - |
26 | | - def __init__( |
27 | | - self, |
28 | | - container_name: str, |
29 | | - account_name: str = "", |
30 | | - account_key: str = "", |
31 | | - connection_string: str = "", |
32 | | - ): |
33 | | - self.container_name = container_name |
34 | | - self.account_name = account_name |
35 | | - self.account_key = account_key |
36 | | - self.connection_string = connection_string |
| 15 | +from .blob_storage_config import BlobStorageConfig |
37 | 16 |
|
| 17 | +StoreItemT = TypeVar("StoreItemT", bound=StoreItem) |
38 | 18 |
|
39 | | -def convert_account_name_and_key_to_connection_string(settings: BlobStorageSettings): |
40 | | - if not settings.account_name or not settings.account_key: |
41 | | - raise ValueError( |
42 | | - "account_name and account_key are both required for BlobStorageSettings if not using a connections string." |
43 | | - ) |
44 | | - return ( |
45 | | - f"DefaultEndpointsProtocol=https;AccountName={settings.account_name};" |
46 | | - f"AccountKey={settings.account_key};EndpointSuffix=core.windows.net" |
47 | | - ) |
48 | 19 |
|
| 20 | +class BlobStorage(AsyncStorageBase): |
49 | 21 |
|
50 | | -class BlobStorage(Storage): |
| 22 | + def __init__(self, config: BlobStorageConfig): |
51 | 23 |
|
52 | | - def __init__(self, settings: BlobStorageSettings): |
53 | | - if not settings.container_name: |
| 24 | + if not config.container_name: |
54 | 25 | raise ValueError("BlobStorage: Container name is required.") |
55 | 26 |
|
56 | | - connection_string: str = settings.connection_string |
57 | | - if not connection_string: |
58 | | - # New Azure Blob SDK only allows connection strings, but our SDK allows key+name. |
59 | | - # This is here for backwards compatibility. |
60 | | - connection_string = convert_account_name_and_key_to_connection_string( |
61 | | - settings |
62 | | - ) |
63 | | - |
64 | | - blob_service_client: BlobServiceClient = ( |
65 | | - BlobServiceClient.from_connection_string(connection_string) |
66 | | - ) |
| 27 | + self.config = config |
67 | 28 |
|
| 29 | + blob_service_client: BlobServiceClient = self._create_client() |
68 | 30 | self._container_client: ContainerClient = ( |
69 | | - blob_service_client.get_container_client(settings.container_name) |
| 31 | + blob_service_client.get_container_client(config.container_name) |
70 | 32 | ) |
71 | 33 | self._initialized: bool = False |
72 | 34 |
|
73 | | - async def _initialize_container(self): |
74 | | - """Initializes the storage container""" |
75 | | - if self._initialized is False: |
76 | | - # This should only happen once - assuming this is a singleton. |
77 | | - # ContainerClient.exists() method is available in an unreleased version of the SDK. Until then, we use: |
78 | | - try: |
79 | | - await self._container_client.create_container() |
80 | | - except ResourceExistsError: |
81 | | - pass |
82 | | - self._initialized = True |
83 | | - |
84 | | - return self._initialized |
85 | | - |
86 | | - async def read( |
87 | | - self, keys: list[str], *, target_cls: StoreItemT = None, **kwargs |
88 | | - ) -> dict[str, StoreItemT]: |
89 | | - """Retrieve entities from the configured blob container. |
90 | | -
|
91 | | - :param keys: An array of entity keys. |
92 | | - :type keys: dict[str, StoreItem] |
93 | | - :param target_cls: The StoreItem class to deserialize retrieved values into. |
94 | | - :type target_cls: StoreItem |
95 | | - :return dict: |
96 | | - """ |
97 | | - if not keys: |
98 | | - raise ValueError("BlobStorage.read(): Keys are required when reading.") |
99 | | - if not target_cls: |
100 | | - raise ValueError("BlobStorage.read(): target_cls cannot be None.") |
101 | | - |
102 | | - await self._initialize_container() |
103 | | - |
104 | | - result: dict[str, StoreItem] = {} |
105 | | - for key in keys: |
106 | | - |
107 | | - try: |
108 | | - item_rep: str = await ( |
109 | | - await self._container_client.download_blob(blob=key) |
110 | | - ).readall() |
111 | | - item_JSON: JSON = json.loads(item_rep) |
112 | | - except HttpResponseError as error: |
113 | | - if error.status_code == 404: |
114 | | - continue |
115 | | - else: |
116 | | - raise HttpResponseError( |
117 | | - f"BlobStorage.read(): Error reading blob '{key}': {error}" |
118 | | - ) |
119 | | - |
120 | | - try: |
121 | | - result[key] = target_cls.from_json_to_store_item(item_JSON) |
122 | | - except AttributeError as error: |
123 | | - raise TypeError( |
124 | | - f"BlobStorage.read(): could not deserialize blob item into {target_cls} class. Error: {error}" |
125 | | - ) |
126 | | - |
127 | | - return result |
128 | | - |
129 | | - async def write(self, changes: dict[str, StoreItem]): |
130 | | - """Stores a new entity in the configured blob container. |
131 | | -
|
132 | | - :param changes: The changes to write to storage. |
133 | | - :type changes: dict[str, StoreItem] |
134 | | - :return: |
135 | | - """ |
136 | | - if not changes: |
137 | | - raise ValueError("BlobStorage.write(): changes cannot be None nor empty") |
138 | | - |
139 | | - await self._initialize_container() |
140 | | - |
141 | | - for key, item in changes.items(): |
142 | | - |
143 | | - item_JSON: JSON = item.store_item_to_json() |
144 | | - if item_JSON is None: |
| 35 | + def _create_client(self) -> BlobServiceClient: |
| 36 | + if self.config.url: # connect with URL and credentials |
| 37 | + if not self.config.credential: |
145 | 38 | raise ValueError( |
146 | | - "BlobStorage.write(): StoreItem serialization cannot return None" |
| 39 | + "BlobStorage: Credential is required when using a custom service URL." |
147 | 40 | ) |
148 | | - item_rep_bytes = json.dumps(item_JSON).encode("utf-8") |
149 | | - |
150 | | - # providing the length parameter may improve performance |
151 | | - await self._container_client.upload_blob( |
152 | | - name=key, |
153 | | - data=BytesIO(item_rep_bytes), |
154 | | - overwrite=True, |
155 | | - length=len(item_rep_bytes), |
| 41 | + return BlobServiceClient( |
| 42 | + account_url=self.config.url, credential=self.config.credential |
156 | 43 | ) |
157 | 44 |
|
158 | | - async def delete(self, keys: list[str]): |
159 | | - """Deletes entity blobs from the configured container. |
| 45 | + else: # connect with connection string |
| 46 | + return BlobServiceClient.from_connection_string( |
| 47 | + self.config.connection_string |
| 48 | + ) |
160 | 49 |
|
161 | | - :param keys: An array of entity keys. |
162 | | - :type keys: list[str] |
163 | | - """ |
164 | | - if keys is None: |
165 | | - raise ValueError("BlobStorage.delete(): keys parameter can't be null") |
| 50 | + async def initialize(self) -> None: |
| 51 | + """Initializes the storage container""" |
| 52 | + if not self._initialized: |
| 53 | + # This should only happen once - assuming this is a singleton. |
| 54 | + await ignore_error( |
| 55 | + self._container_client.create_container(), is_status_code_error(409) |
| 56 | + ) |
| 57 | + self._initialized = True |
| 58 | + |
| 59 | + async def _read_item( |
| 60 | + self, key: str, *, target_cls: StoreItemT = None, **kwargs |
| 61 | + ) -> tuple[Union[str, None], Union[StoreItemT, None]]: |
| 62 | + item = await ignore_error( |
| 63 | + self._container_client.download_blob(blob=key), |
| 64 | + is_status_code_error(404), |
| 65 | + ) |
| 66 | + if not item: |
| 67 | + return None, None |
| 68 | + |
| 69 | + item_rep: str = await item.readall() |
| 70 | + item_JSON: JSON = json.loads(item_rep) |
| 71 | + try: |
| 72 | + return key, target_cls.from_json_to_store_item(item_JSON) |
| 73 | + except AttributeError as error: |
| 74 | + raise TypeError( |
| 75 | + f"BlobStorage.read_item(): could not deserialize blob item into {target_cls} class. Error: {error}" |
| 76 | + ) |
166 | 77 |
|
167 | | - await self._initialize_container() |
| 78 | + async def _write_item(self, key: str, item: StoreItem) -> None: |
| 79 | + item_JSON: JSON = item.store_item_to_json() |
| 80 | + if item_JSON is None: |
| 81 | + raise ValueError( |
| 82 | + "BlobStorage.write(): StoreItem serialization cannot return None" |
| 83 | + ) |
| 84 | + item_rep_bytes = json.dumps(item_JSON).encode("utf-8") |
| 85 | + |
| 86 | + # getting the length is important for performance with large blobs |
| 87 | + await self._container_client.upload_blob( |
| 88 | + name=key, |
| 89 | + data=BytesIO(item_rep_bytes), |
| 90 | + overwrite=True, |
| 91 | + length=len(item_rep_bytes), |
| 92 | + ) |
168 | 93 |
|
169 | | - for key in keys: |
170 | | - try: |
171 | | - await self._container_client.delete_blob(blob=key) |
172 | | - # We can't delete what's already gone. |
173 | | - except ResourceNotFoundError: |
174 | | - pass |
| 94 | + async def _delete_item(self, key: str) -> None: |
| 95 | + await ignore_error( |
| 96 | + self._container_client.delete_blob(blob=key), is_status_code_error(404) |
| 97 | + ) |
0 commit comments