Skip to content

Commit 3117fa7

Browse files
committed
Add exception diagnostics table to quantum provenance
1 parent 924f491 commit 3117fa7

File tree

1 file changed

+157
-2
lines changed

1 file changed

+157
-2
lines changed

python/lsst/pipe/base/quantum_provenance_graph.py

Lines changed: 157 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,12 +42,16 @@
4242
import concurrent.futures
4343
import dataclasses
4444
import datetime
45+
import io
4546
import itertools
4647
import logging
48+
import sys
4749
import textwrap
4850
import threading
4951
import uuid
52+
from collections import defaultdict
5053
from collections.abc import Callable, Iterator, Mapping, Sequence, Set
54+
from contextlib import contextmanager
5155
from enum import Enum
5256
from typing import TYPE_CHECKING, Any, ClassVar, Literal, TypedDict, cast
5357

@@ -934,7 +938,33 @@ def aggregate(cls, summaries: Sequence[Summary]) -> Summary:
934938
result_dataset_summary._add_data_id_group(dataset_type_summary)
935939
return result
936940

937-
def pprint(self, brief: bool = False, datasets: bool = True) -> None:
941+
@contextmanager
942+
def tty_buffer(self) -> Iterator[io.StringIO]:
943+
"""Context manager that temporarily redirects sys.stdout to a
944+
teletypewriter-like buffer. Useful for capturing output that formats
945+
differently when writing to a TTY.
946+
"""
947+
948+
class MockTTY(io.StringIO):
949+
# Pretend to be a terminal to capture full TTY output.
950+
def isatty(self) -> bool:
951+
return True
952+
953+
orig = sys.stdout
954+
buf = MockTTY()
955+
sys.stdout = buf
956+
try:
957+
yield buf # Use buffer inside `with` block.
958+
finally:
959+
sys.stdout = orig # Restore original stdout.
960+
961+
def pprint(
962+
self,
963+
brief: bool = False,
964+
datasets: bool = True,
965+
show_exception_diagnostics: bool = False,
966+
butler: Butler | None = None,
967+
) -> None:
938968
"""Print this summary to stdout, as a series of tables.
939969
940970
Parameters
@@ -948,6 +978,9 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None:
948978
includes a summary table of dataset counts for various status and
949979
(if ``brief`` is `True`) a table with per-data ID information for
950980
each unsuccessful or cursed dataset.
981+
butler : `lsst.daf.butler.Butler`, optional
982+
The butler used to create this summary. This is only used to get
983+
exposure dimension records for the exception diagnostics.
951984
"""
952985
self.make_quantum_table().pprint_all()
953986
print("")
@@ -958,6 +991,24 @@ def pprint(self, brief: bool = False, datasets: bool = True) -> None:
958991
if exception_table := self.make_exception_table():
959992
exception_table.pprint_all()
960993
print("")
994+
if show_exception_diagnostics:
995+
exception_diagnostics_table = self.make_exception_diagnostics_table(
996+
butler, max_message_width=45, shorten_type_name=True
997+
)
998+
with self.tty_buffer() as buffer:
999+
# Use pprint() to trim long tables; pprint_all() may flood the
1000+
# screen in those cases.
1001+
exception_diagnostics_table.pprint()
1002+
last_line = buffer.getvalue().splitlines()[-1]
1003+
# Print the table from the buffer.
1004+
print(buffer.getvalue())
1005+
if "Length =" in last_line:
1006+
# The table was too long to print, so we had to truncate it.
1007+
print(
1008+
"▲ Note: The exception diagnostics table above is truncated. "
1009+
"Use --exception-diagnostics-filename to save the complete table."
1010+
)
1011+
print("")
9611012
if datasets:
9621013
self.make_dataset_table().pprint_all()
9631014
print("")
@@ -1075,6 +1126,110 @@ def make_exception_table(self) -> astropy.table.Table:
10751126
rows.append({"Task": task_label, "Exception": type_name, "Count": len(exception_summaries)})
10761127
return astropy.table.Table(rows)
10771128

1129+
def make_exception_diagnostics_table(
1130+
self,
1131+
butler: Butler | None = None,
1132+
add_exception_msg: bool = True,
1133+
max_message_width: int | None = None,
1134+
shorten_type_name: bool = False,
1135+
) -> astropy.table.Table:
1136+
"""Construct an `astropy.table.Table` showing exceptions grouped by
1137+
data ID.
1138+
1139+
Each row represents one data ID that encountered an exception, along
1140+
with the exception type under the column named after the task that
1141+
raised it. If a Butler is provided, the table will also include a
1142+
subset of exposure-related metadata pulled from the exposure dimension
1143+
records. The exception message can optionaly be included in the table.
1144+
1145+
Parameters
1146+
----------
1147+
butler : `lsst.daf.butler.Butler`, optional
1148+
Butler instance used to fetch exposure records. If not provided,
1149+
exposure dimension records will not be included in the table.
1150+
add_exception_msg : `bool`, optional
1151+
If `True`, include the exception message in the table.
1152+
max_message_width : `int`, optional
1153+
Maximum width for storing exception messages in the output table.
1154+
Longer messages will be truncated. If not provided, messages will
1155+
be included in full without truncation.
1156+
shorten_type_name : `bool`, optional
1157+
If `True`, shorten the exception type name by removing the
1158+
package name. This is useful for making the table more readable
1159+
when the package name is long or not relevant to the user.
1160+
1161+
Returns
1162+
-------
1163+
table : `astropy.table.Table`
1164+
Table with one row per data ID and columns for exception types (by
1165+
task), and optionally, exposure dimension records and exception
1166+
messages.
1167+
"""
1168+
add_exposure_records = True
1169+
needed_exposure_records = ["day_obs", "physical_filter", "exposure_time", "target_name"]
1170+
1171+
# Preload all exposure dimension records up front for faster O(1)
1172+
# lookup later. Querying per data ID in the loop is painfully slow.
1173+
if butler:
1174+
exposure_record_lookup = {
1175+
d.dataId["exposure"]: d for d in butler.query_dimension_records("exposure", explain=False)
1176+
}
1177+
else:
1178+
exposure_record_lookup = {}
1179+
add_exposure_records = False
1180+
1181+
if butler and not exposure_record_lookup:
1182+
_LOG.warning("No exposure records found in the butler; they will not be included in the table.")
1183+
add_exposure_records = False
1184+
1185+
rows: defaultdict[tuple, defaultdict[str, str]] = defaultdict(lambda: defaultdict(str))
1186+
1187+
# Loop over all tasks and exceptions, and associate them with data IDs.
1188+
for task_label, task_summary in self.tasks.items():
1189+
for type_name, exceptions in task_summary.exceptions.items():
1190+
for exception in exceptions:
1191+
data_id = exception.data_id
1192+
key = tuple(sorted(data_id.items())) # Hashable and stable
1193+
assert len(rows[key]) == 0, f"Multiple exceptions for one data ID: {key}"
1194+
assert rows[key]["Exception"] == "", f"Duplicate entry for data ID {key} in {task_label}"
1195+
if shorten_type_name:
1196+
# Trim off the package name from the exception type for
1197+
# brevity.
1198+
type_name = type_name.rsplit(".", maxsplit=1)[-1]
1199+
rows[key]["Task"] = task_label
1200+
rows[key]["Exception"] = type_name
1201+
if add_exception_msg:
1202+
msg = exception.exception.message
1203+
if max_message_width and len(msg) > max_message_width:
1204+
msg = textwrap.shorten(msg, max_message_width)
1205+
rows[key]["Exception Message"] = msg
1206+
if add_exposure_records:
1207+
exposure_record = exposure_record_lookup[data_id["exposure"]]
1208+
for k in needed_exposure_records:
1209+
rows[key][k] = getattr(exposure_record, k)
1210+
1211+
# Extract all unique columns.
1212+
all_columns = {col for r in rows.values() for col in r}
1213+
table_rows = []
1214+
1215+
# Loop over all rows and add them to the table.
1216+
for key, col_counts in rows.items():
1217+
# Add data ID values as columns at the start of the row.
1218+
row = dict(key)
1219+
# Add exposure records next, if requested.
1220+
if add_exposure_records:
1221+
for col in needed_exposure_records:
1222+
row[col] = col_counts.get(col, "-")
1223+
# Add all other columns last.
1224+
for col in all_columns - set(needed_exposure_records) - {"Exception Message"}:
1225+
row[col] = col_counts.get(col, "-")
1226+
# Add the exception message if requested.
1227+
if add_exception_msg:
1228+
row["Exception Message"] = col_counts.get("Exception Message", "-")
1229+
table_rows.append(row)
1230+
1231+
return astropy.table.Table(table_rows)
1232+
10781233
def make_bad_quantum_tables(self, max_message_width: int = 80) -> dict[str, astropy.table.Table]:
10791234
"""Construct an `astropy.table.Table` with per-data-ID information
10801235
about failed, wonky, and partial-outputs-error quanta.
@@ -1295,7 +1450,7 @@ def to_summary(
12951450
----------
12961451
butler : `lsst.daf.butler.Butler`, optional
12971452
Ignored; accepted for backwards compatibility.
1298-
do_store_logs : `bool`
1453+
do_store_logs : `bool`, optional
12991454
Store the logs in the summary dictionary.
13001455
n_cores : `int`, optional
13011456

0 commit comments

Comments
 (0)