Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 55 additions & 10 deletions ingestion/src/metadata/ingestion/source/dashboard/sigma/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
Workbook,
WorkbookDetails,
WorkBookPageResponse,
WorkbookQueriesResponse,
WorkBookResponseDetails,
)
from metadata.utils.constants import AUTHORIZATION_HEADER, UTF_8
Expand Down Expand Up @@ -202,7 +203,9 @@ def get_chart_details(self, workbook_id: str) -> Optional[List[Elements]]:
if not pages.entries:
return None
for page in pages.entries:
elements_list.extend(self.get_page_elements(workbook_id, page.pageId))
page_elements = self.get_page_elements(workbook_id, page.pageId)
if page_elements:
elements_list.extend(page_elements)
while pages.nextPage:
pages = WorkBookPageResponse.model_validate(
self.client.get(
Expand All @@ -213,9 +216,9 @@ def get_chart_details(self, workbook_id: str) -> Optional[List[Elements]]:
if not pages.entries:
break
for page in pages.entries:
elements_list.extend(
self.get_page_elements(workbook_id, page.pageId)
)
page_elements = self.get_page_elements(workbook_id, page.pageId)
if page_elements:
elements_list.extend(page_elements)
return elements_list
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
Expand All @@ -224,6 +227,32 @@ def get_chart_details(self, workbook_id: str) -> Optional[List[Elements]]:
)
return None

def get_workbook_queries(
self, workbook_id: str
) -> Optional[WorkbookQueriesResponse]:
"""
Fetch SQL queries for all elements in a workbook
"""
try:
queries = []
result = WorkbookQueriesResponse.model_validate(
self.client.get(f"/workbooks/{workbook_id}/queries")
)
if result:
queries.extend(result.entries)
while result.nextPage:
data = {"page": int(result.nextPage)}
result = WorkbookQueriesResponse.model_validate(
self.client.get(f"/workbooks/{workbook_id}/queries", data=data)
)
if result:
queries.extend(result.entries)
return WorkbookQueriesResponse(entries=queries, total=len(queries))
except Exception as exc:
logger.debug(traceback.format_exc())
logger.warning(f"Failed to fetch queries for workbook {workbook_id}: {exc}")
return None

def get_lineage_details(
self, workbook_id: str, element_id: str
) -> Optional[List[NodeDetails]]:
Expand All @@ -237,16 +266,32 @@ def get_lineage_details(
f"/workbooks/{workbook_id}/lineage/elements/{element_id}"
)
)
for node in edges_response.edges:
if node.node_id:

for edge in edges_response.edges:
if not edge.node_id:
continue

if edge.source in edges_response.dependencies:
continue

try:
node_details = NodeDetails.model_validate(
self.client.get(f"/files/{node.node_id}")
self.client.get(f"/files/{edge.node_id}")
)
source_nodes.append(node_details)
return source_nodes

if node_details.node_type in ["table", "dataset"]:
source_nodes.append(node_details)
except Exception as node_exc:
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch node details for {edge.node_id}: {node_exc}"
)
continue

return source_nodes if source_nodes else None
except Exception as exc: # pylint: disable=broad-except
logger.debug(traceback.format_exc())
logger.warning(
f"Failed to fetch lineage details for workbook {workbook_id}: {exc}"
f"Failed to fetch lineage details for workbook {workbook_id}, element {element_id}: {exc}"
)
return None
Loading
Loading