Skip to content

Commit c20fc75

Browse files
committed
WIP: use callbacks for progress bars
1 parent 2348e7a commit c20fc75

File tree

6 files changed

+120
-153
lines changed

6 files changed

+120
-153
lines changed

bigframes/display/anywidget.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def _validate_page_size(self, proposal: Dict[str, Any]) -> int:
149149

150150
def _update_progress(self, event):
151151
# TODO: use formatting helpers here.
152-
self.progress_html = f"<code>{repr(event)}"
152+
self.progress_html = f"<code>{repr(event)}</code>"
153153

154154
def _get_next_batch(self) -> bool:
155155
"""

bigframes/display/table_widget.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ function render({ model, el }) {
6060
const pageSizeSelect = document.createElement("select");
6161

6262
// Add CSS classes
63+
progressContainer.classList.add("progress-container");
6364
tableContainer.classList.add("table-container");
6465
footer.classList.add("footer");
6566
paginationContainer.classList.add("pagination");
@@ -123,7 +124,7 @@ function render({ model, el }) {
123124
}
124125

125126
/** Updates the HTML in the progress container. */
126-
function handleTableHTMLChange() {
127+
function handleProgressHTMLChange() {
127128
// Note: Using innerHTML is safe here because the content is generated
128129
// by a trusted backend (formatting_helpers).
129130
progressContainer.innerHTML = model.get(ModelProperty.PROGRESS_HTML);
@@ -147,6 +148,7 @@ function render({ model, el }) {
147148
}
148149
});
149150
model.on(Event.CHANGE_TABLE_HTML, handleTableHTMLChange);
151+
model.on(Event.CHANGE_PROGRESS_HTML, handleProgressHTMLChange);
150152

151153
// Assemble the DOM
152154
paginationContainer.appendChild(prevPage);

bigframes/formatting_helpers.py

Lines changed: 48 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,45 @@
4242
}
4343

4444

45+
def create_progress_bar_callback(
46+
*,
47+
progress_bar: Optional[str] = None,
48+
callback: Callable = lambda _: None,
49+
) -> Callable:
50+
if progress_bar == "auto":
51+
progress_bar = "notebook" if in_ipython() else "terminal"
52+
53+
if progress_bar == "notebook":
54+
loading_bar = display.HTML("")
55+
display_id = str(random.random())
56+
display.display(loading_bar, display_id=display_id)
57+
58+
def outer_callback(event):
59+
callback(event)
60+
display.update_display(
61+
display.HTML(get_query_job_loading_html(event)),
62+
display_id=display_id,
63+
)
64+
65+
elif progress_bar == "terminal":
66+
previous_bar_text = ""
67+
68+
def outer_callback(event):
69+
nonlocal previous_bar_text
70+
71+
callback(event)
72+
73+
bar_text = get_query_job_loading_string(event)
74+
if bar_text != previous_bar_text:
75+
print(bar_text)
76+
previous_bar_text = bar_text
77+
78+
else:
79+
outer_callback = callback
80+
81+
return outer_callback
82+
83+
4584
def add_feedback_link(
4685
exception: Union[
4786
api_core_exceptions.RetryError, api_core_exceptions.GoogleAPICallError
@@ -125,7 +164,6 @@ def wait_for_query_job(
125164
query_job: bigquery.QueryJob,
126165
max_results: Optional[int] = None,
127166
page_size: Optional[int] = None,
128-
progress_bar: Optional[str] = None,
129167
callback: Callable = lambda _: None,
130168
) -> bigquery.table.RowIterator:
131169
"""Return query results. Displays a progress bar while the query is running
@@ -141,36 +179,8 @@ def wait_for_query_job(
141179
Returns:
142180
A row iterator over the query results.
143181
"""
144-
if progress_bar == "auto":
145-
progress_bar = "notebook" if in_ipython() else "terminal"
146-
147-
if progress_bar == "notebook":
148-
loading_bar = display.HTML(get_query_job_loading_html(query_job))
149-
display_id = str(random.random())
150-
display.display(loading_bar, display_id=display_id)
151-
152-
def extended_callback(event):
153-
callback(event)
154-
display.update_display(
155-
display.HTML(get_query_job_loading_html(query_job)),
156-
display_id=display_id,
157-
)
158-
159-
elif progress_bar == "terminal":
160-
initial_loading_bar = get_query_job_loading_string(query_job)
161-
print(initial_loading_bar)
162-
163-
def extended_callback(event):
164-
callback(event)
165-
166-
if initial_loading_bar != get_query_job_loading_string(query_job):
167-
print(get_query_job_loading_string(query_job))
168-
169-
else:
170-
extended_callback = callback
171-
172182
try:
173-
extended_callback(
183+
callback(
174184
# DONOTSUBMIT: we should create our own events.
175185
google.cloud.bigquery._job_helpers.QueryReceivedEvent(
176186
billing_project=query_job.project,
@@ -184,11 +194,15 @@ def extended_callback(event):
184194
ended=query_job.ended,
185195
)
186196
)
197+
# TODO(tswast): Add a timeout so that progress bars can make updates as
198+
# the query stats come int.
199+
# TODO(tswast): Listen for cancellation on the callback (or maybe
200+
# callbacks should just raise KeyboardInterrupt like IPython does?).
187201
query_results = query_job.result(
188202
page_size=page_size,
189203
max_results=max_results,
190204
)
191-
extended_callback(
205+
callback(
192206
# DONOTSUBMIT: we should create our own events.
193207
google.cloud.bigquery._job_helpers.QueryFinishedEvent(
194208
billing_project=query_job.project,
@@ -206,13 +220,16 @@ def extended_callback(event):
206220
)
207221
return query_results
208222
except api_core_exceptions.RetryError as exc:
223+
# TODO: turn this into a callback event, too.
209224
add_feedback_link(exc)
210225
raise
211226
except api_core_exceptions.GoogleAPICallError as exc:
227+
# TODO: turn this into a callback event, too.
212228
add_feedback_link(exc)
213229
raise
214230
except KeyboardInterrupt:
215231
query_job.cancel()
232+
# TODO: turn this into a callback event, too.
216233
print(
217234
f"Requested cancellation for {query_job.job_type.capitalize()}"
218235
f" job {query_job.job_id} in location {query_job.location}..."

bigframes/session/_io/bigquery/__init__.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,12 @@ def start_query_with_client(
362362
"""
363363
Starts query job and waits for results.
364364
"""
365+
opts = bigframes.options.display
366+
progress_callback = formatting_helpers.create_progress_bar_callback(
367+
progress_bar=opts.progress_bar,
368+
callback=callback,
369+
)
370+
365371
try:
366372
# Note: Ensure no additional labels are added to job_config after this
367373
# point, as `add_and_trim_labels` ensures the label count does not
@@ -376,7 +382,7 @@ def start_query_with_client(
376382
project=project,
377383
api_timeout=timeout,
378384
job_retry=job_retry,
379-
callback=callback,
385+
callback=progress_callback,
380386
)
381387
if metrics is not None:
382388
metrics.count_job_stats(row_iterator=results_iterator)
@@ -395,7 +401,6 @@ def start_query_with_client(
395401
ex.message += CHECK_DRIVE_PERMISSIONS
396402
raise
397403

398-
opts = bigframes.options.display
399404
if opts.progress_bar is not None and not query_job.configuration.dry_run:
400405
results_iterator = formatting_helpers.wait_for_query_job(
401406
query_job,

bigframes/session/bq_caching_executor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,6 @@ def _export_gbq(
321321

322322
# TODO(swast): plumb through the api_name of the user-facing api that
323323
# caused this query.
324-
breakpoint()
325324
row_iter, query_job = self._run_execute_query(
326325
sql=sql,
327326
job_config=job_config,

0 commit comments

Comments
 (0)