Skip to content

Commit 5ea79ea

Browse files
authored
Merge branch 'main' into tswast-patch-1
2 parents 252f283 + 51057fc commit 5ea79ea

File tree

13 files changed

+399
-86
lines changed

13 files changed

+399
-86
lines changed

bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from bigframes.core import window_spec
2222
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
23-
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
23+
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
2424
from bigframes.operations import aggregations as agg_ops
2525

2626
NULLARY_OP_REGISTRATION = reg.OpRegistration()

bigframes/core/compile/sqlglot/aggregations/unary_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from bigframes.core import window_spec
2222
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
23-
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
23+
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
2424
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
2525
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
2626
from bigframes.operations import aggregations as agg_ops

bigframes/core/compile/sqlglot/aggregations/utils.py

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import typing
17+
18+
import sqlglot.expressions as sge
19+
20+
from bigframes.core import utils, window_spec
21+
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
22+
import bigframes.core.ordering as ordering_spec
23+
24+
25+
def apply_window_if_present(
26+
value: sge.Expression,
27+
window: typing.Optional[window_spec.WindowSpec] = None,
28+
) -> sge.Expression:
29+
if window is None:
30+
return value
31+
32+
if window.is_row_bounded and not window.ordering:
33+
raise ValueError("No ordering provided for ordered analytic function")
34+
elif (
35+
not window.is_row_bounded
36+
and not window.is_range_bounded
37+
and not window.ordering
38+
):
39+
# Unbound grouping window.
40+
order_by = None
41+
elif window.is_range_bounded:
42+
# Note that, when the window is range-bounded, we only need one ordering key.
43+
# There are two reasons:
44+
# 1. Manipulating null positions requires more than one ordering key, which
45+
# is forbidden by SQL window syntax for range rolling.
46+
# 2. Pandas does not allow range rolling on timeseries with nulls.
47+
order_by = get_window_order_by((window.ordering[0],), override_null_order=False)
48+
else:
49+
order_by = get_window_order_by(window.ordering, override_null_order=True)
50+
51+
order = sge.Order(expressions=order_by) if order_by else None
52+
53+
group_by = (
54+
[scalar_compiler.compile_scalar_expression(key) for key in window.grouping_keys]
55+
if window.grouping_keys
56+
else None
57+
)
58+
59+
# This is the key change. Don't create a spec for the default window frame
60+
# if there's no ordering. This avoids generating an `ORDER BY NULL` clause.
61+
if not window.bounds and not order:
62+
return sge.Window(this=value, partition_by=group_by)
63+
64+
kind = (
65+
"ROWS" if isinstance(window.bounds, window_spec.RowsWindowBounds) else "RANGE"
66+
)
67+
68+
start: typing.Union[int, float, None] = None
69+
end: typing.Union[int, float, None] = None
70+
if isinstance(window.bounds, window_spec.RangeWindowBounds):
71+
if window.bounds.start is not None:
72+
start = utils.timedelta_to_micros(window.bounds.start)
73+
if window.bounds.end is not None:
74+
end = utils.timedelta_to_micros(window.bounds.end)
75+
elif window.bounds:
76+
start = window.bounds.start
77+
end = window.bounds.end
78+
79+
start_value, start_side = _get_window_bounds(start, is_preceding=True)
80+
end_value, end_side = _get_window_bounds(end, is_preceding=False)
81+
82+
spec = sge.WindowSpec(
83+
kind=kind,
84+
start=start_value,
85+
start_side=start_side,
86+
end=end_value,
87+
end_side=end_side,
88+
over="OVER",
89+
)
90+
91+
return sge.Window(this=value, partition_by=group_by, order=order, spec=spec)
92+
93+
94+
def get_window_order_by(
95+
ordering: typing.Tuple[ordering_spec.OrderingExpression, ...],
96+
override_null_order: bool = False,
97+
) -> typing.Optional[tuple[sge.Ordered, ...]]:
98+
"""Returns the SQL order by clause for a window specification."""
99+
if not ordering:
100+
return None
101+
102+
order_by = []
103+
for ordering_spec_item in ordering:
104+
expr = scalar_compiler.compile_scalar_expression(
105+
ordering_spec_item.scalar_expression
106+
)
107+
desc = not ordering_spec_item.direction.is_ascending
108+
nulls_first = not ordering_spec_item.na_last
109+
110+
if override_null_order:
111+
# Bigquery SQL considers NULLS to be "smallest" values, but we need
112+
# to override in these cases.
113+
is_null_expr = sge.Is(this=expr, expression=sge.Null())
114+
if nulls_first and desc:
115+
order_by.append(
116+
sge.Ordered(
117+
this=is_null_expr,
118+
desc=desc,
119+
nulls_first=nulls_first,
120+
)
121+
)
122+
elif not nulls_first and not desc:
123+
order_by.append(
124+
sge.Ordered(
125+
this=is_null_expr,
126+
desc=desc,
127+
nulls_first=nulls_first,
128+
)
129+
)
130+
131+
order_by.append(
132+
sge.Ordered(
133+
this=expr,
134+
desc=desc,
135+
nulls_first=nulls_first,
136+
)
137+
)
138+
return tuple(order_by)
139+
140+
141+
def _get_window_bounds(
142+
value, is_preceding: bool
143+
) -> tuple[typing.Union[str, sge.Expression], typing.Optional[str]]:
144+
"""Compiles a single boundary value into its SQL components."""
145+
if value is None:
146+
side = "PRECEDING" if is_preceding else "FOLLOWING"
147+
return "UNBOUNDED", side
148+
149+
if value == 0:
150+
return "CURRENT ROW", None
151+
152+
side = "PRECEDING" if value < 0 else "FOLLOWING"
153+
return sge.convert(abs(value)), side

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite
2424
from bigframes.core.compile import configs
2525
import bigframes.core.compile.sqlglot.aggregate_compiler as aggregate_compiler
26+
from bigframes.core.compile.sqlglot.aggregations import windows
2627
from bigframes.core.compile.sqlglot.expressions import typed_expr
2728
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
2829
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
@@ -272,18 +273,16 @@ def compile_random_sample(
272273
def compile_aggregate(
273274
self, node: nodes.AggregateNode, child: ir.SQLGlotIR
274275
) -> ir.SQLGlotIR:
275-
ordering_cols = tuple(
276-
sge.Ordered(
277-
this=scalar_compiler.compile_scalar_expression(
278-
ordering.scalar_expression
279-
),
280-
desc=ordering.direction.is_ascending is False,
281-
nulls_first=ordering.na_last is False,
282-
)
283-
for ordering in node.order_by
276+
ordering_cols = windows.get_window_order_by(
277+
node.order_by, override_null_order=True
284278
)
285279
aggregations: tuple[tuple[str, sge.Expression], ...] = tuple(
286-
(id.sql, aggregate_compiler.compile_aggregate(agg, order_by=ordering_cols))
280+
(
281+
id.sql,
282+
aggregate_compiler.compile_aggregate(
283+
agg, order_by=ordering_cols if ordering_cols else ()
284+
),
285+
)
287286
for agg, id in node.aggregations
288287
)
289288
by_cols: tuple[sge.Expression, ...] = tuple(

bigframes/pandas/io/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import functools
1818
import inspect
19+
import os
1920
import threading
2021
import typing
2122
from typing import (
@@ -56,6 +57,7 @@
5657
from bigframes.session import dry_runs
5758
import bigframes.session._io.bigquery
5859
import bigframes.session.clients
60+
import bigframes.session.metrics
5961

6062
# Note: the following methods are duplicated from Session. This duplication
6163
# enables the following:
@@ -625,6 +627,11 @@ def _get_bqclient() -> bigquery.Client:
625627

626628
def _dry_run(query, bqclient) -> bigquery.QueryJob:
627629
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
630+
631+
# Fix for b/435183833. Log metrics even if a Session isn't available.
632+
if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ:
633+
metrics = bigframes.session.metrics.ExecutionMetrics()
634+
metrics.count_job_stats(job)
628635
return job
629636

630637

bigframes/session/metrics.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,32 +40,54 @@ def count_job_stats(
4040
):
4141
if query_job is None:
4242
assert row_iterator is not None
43-
total_bytes_processed = getattr(row_iterator, "total_bytes_processed", None)
44-
query = getattr(row_iterator, "query", None)
45-
if total_bytes_processed is None or query is None:
46-
return
43+
44+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
45+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0)
46+
query_char_count = len(getattr(row_iterator, "query", ""))
47+
slot_millis = getattr(row_iterator, "slot_millis", 0)
48+
exec_seconds = 0.0
4749

4850
self.execution_count += 1
49-
self.query_char_count += len(query)
50-
self.bytes_processed += total_bytes_processed
51-
write_stats_to_disk(len(query), total_bytes_processed)
52-
return
51+
self.query_char_count += query_char_count
52+
self.bytes_processed += bytes_processed
53+
self.slot_millis += slot_millis
54+
55+
elif query_job.configuration.dry_run:
56+
query_char_count = len(query_job.query)
5357

54-
if query_job.configuration.dry_run:
55-
write_stats_to_disk(len(query_job.query), 0, 0, 0)
58+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
59+
bytes_processed = 0
60+
slot_millis = 0
61+
exec_seconds = 0.0
5662

57-
stats = get_performance_stats(query_job)
58-
if stats is not None:
59-
query_char_count, bytes_processed, slot_millis, execution_secs = stats
63+
elif (stats := get_performance_stats(query_job)) is not None:
64+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
6065
self.execution_count += 1
6166
self.query_char_count += query_char_count
6267
self.bytes_processed += bytes_processed
6368
self.slot_millis += slot_millis
64-
self.execution_secs += execution_secs
69+
self.execution_secs += exec_seconds
6570
write_stats_to_disk(
66-
query_char_count, bytes_processed, slot_millis, execution_secs
71+
query_char_count=query_char_count,
72+
bytes_processed=bytes_processed,
73+
slot_millis=slot_millis,
74+
exec_seconds=exec_seconds,
6775
)
6876

77+
else:
78+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
79+
bytes_processed = 0
80+
query_char_count = 0
81+
slot_millis = 0
82+
exec_seconds = 0
83+
84+
write_stats_to_disk(
85+
query_char_count=query_char_count,
86+
bytes_processed=bytes_processed,
87+
slot_millis=slot_millis,
88+
exec_seconds=exec_seconds,
89+
)
90+
6991

7092
def get_performance_stats(
7193
query_job: bigquery.QueryJob,
@@ -103,10 +125,11 @@ def get_performance_stats(
103125

104126

105127
def write_stats_to_disk(
128+
*,
106129
query_char_count: int,
107130
bytes_processed: int,
108-
slot_millis: Optional[int] = None,
109-
exec_seconds: Optional[float] = None,
131+
slot_millis: int,
132+
exec_seconds: float,
110133
):
111134
"""For pytest runs only, log information about the query job
112135
to a file in order to create a performance report.
@@ -118,18 +141,17 @@ def write_stats_to_disk(
118141
test_name = os.environ[LOGGING_NAME_ENV_VAR]
119142
current_directory = os.getcwd()
120143

121-
if (slot_millis is not None) and (exec_seconds is not None):
122-
# store slot milliseconds
123-
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
124-
with open(slot_file, "a") as f:
125-
f.write(str(slot_millis) + "\n")
144+
# store slot milliseconds
145+
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
146+
with open(slot_file, "a") as f:
147+
f.write(str(slot_millis) + "\n")
126148

127-
# store execution time seconds
128-
exec_time_file = os.path.join(
129-
current_directory, test_name + ".bq_exec_time_seconds"
130-
)
131-
with open(exec_time_file, "a") as f:
132-
f.write(str(exec_seconds) + "\n")
149+
# store execution time seconds
150+
exec_time_file = os.path.join(
151+
current_directory, test_name + ".bq_exec_time_seconds"
152+
)
153+
with open(exec_time_file, "a") as f:
154+
f.write(str(exec_seconds) + "\n")
133155

134156
# store length of query
135157
query_char_count_file = os.path.join(

0 commit comments

Comments
 (0)