Skip to content

Commit 0593eae

Browse files
committed
Fix coroutine serialization error in PowerBIDatasetRefreshOperator
1 parent eb6a4d8 commit 0593eae

3 files changed

Lines changed: 22 additions & 4 deletions

File tree

providers/microsoft/azure/src/airflow/providers/microsoft/azure/hooks/powerbi.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from __future__ import annotations
1919

20+
import asyncio
2021
from enum import Enum
2122
from typing import TYPE_CHECKING, Any
2223

@@ -189,6 +190,22 @@ async def get_refresh_details_by_refresh_id(
189190

190191
return refresh_details
191192

193+
def trigger_dataset_refresh_sync(
194+
self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None
195+
) -> str:
196+
"""
197+
Trigger Power BI Dataset refresh synchronously.
198+
199+
This is a wrapper around the async `trigger_dataset_refresh` method.
200+
"""
201+
return asyncio.run(
202+
self.trigger_dataset_refresh(
203+
dataset_id=dataset_id,
204+
group_id=group_id,
205+
request_body=request_body,
206+
)
207+
)
208+
192209
async def trigger_dataset_refresh(
193210
self, *, dataset_id: str, group_id: str, request_body: dict[str, Any] | None = None
194211
) -> str:

providers/microsoft/azure/src/airflow/providers/microsoft/azure/operators/powerbi.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def execute(self, context: Context):
123123
conn_id=self.conn_id, proxies=self.proxies, api_version=self.api_version, timeout=self.timeout
124124
)
125125

126-
dataset_refresh_id = hook.trigger_dataset_refresh(
126+
dataset_refresh_id = hook.trigger_dataset_refresh_sync(
127127
dataset_id=self.dataset_id,
128128
group_id=self.group_id,
129129
request_body=self.request_body,

providers/microsoft/azure/tests/unit/microsoft/azure/operators/test_powerbi.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def test_powerbi_link(self, dag_maker, create_task_instance_of_operator):
237237
def test_execute_fire_and_forget_mode(self, mock_connection, mock_hook_class):
238238
"""Test fire-and-forget mode (wait_for_completion=False)"""
239239
mock_hook_instance = mock_hook_class.return_value
240-
mock_hook_instance.trigger_dataset_refresh.return_value = NEW_REFRESH_REQUEST_ID
240+
mock_hook_instance.trigger_dataset_refresh_sync.return_value = NEW_REFRESH_REQUEST_ID
241241

242242
operator = PowerBIDatasetRefreshOperator(
243243
**CONFIG,
@@ -250,7 +250,7 @@ def test_execute_fire_and_forget_mode(self, mock_connection, mock_hook_class):
250250
result = operator.execute(context)
251251

252252
# Verify hook was called correctly
253-
mock_hook_instance.trigger_dataset_refresh.assert_called_once_with(
253+
mock_hook_instance.trigger_dataset_refresh_sync.assert_called_once_with(
254254
dataset_id=DATASET_ID,
255255
group_id=GROUP_ID,
256256
request_body=REQUEST_BODY,
@@ -261,6 +261,7 @@ def test_execute_fire_and_forget_mode(self, mock_connection, mock_hook_class):
261261
call_args = context["ti"].xcom_push.call_args
262262
assert call_args[1]["key"] == f"{TASK_ID}.powerbi_dataset_refresh_id"
263263
assert call_args[1]["value"] == NEW_REFRESH_REQUEST_ID
264+
assert isinstance(call_args[1]["value"], str)
264265

265266
# Should return None (completes synchronously)
266267
assert result is None
@@ -270,7 +271,7 @@ def test_execute_fire_and_forget_mode(self, mock_connection, mock_hook_class):
270271
def test_execute_fire_and_forget_mode_failure(self, mock_connection, mock_hook_class):
271272
"""Test fire-and-forget mode raises exception when trigger fails"""
272273
mock_hook_instance = mock_hook_class.return_value
273-
mock_hook_instance.trigger_dataset_refresh.return_value = None
274+
mock_hook_instance.trigger_dataset_refresh_sync.return_value = None
274275

275276
operator = PowerBIDatasetRefreshOperator(
276277
**CONFIG,

0 commit comments

Comments
 (0)