Skip to content

Commit 5229ec5

Browse files
authored
Merge branch 'main' into main_chelsealin_add
2 parents 3074a1f + a298a02 commit 5229ec5

File tree

10 files changed

+341
-56
lines changed

10 files changed

+341
-56
lines changed

bigframes/core/compile/sqlglot/aggregations/nullary_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from bigframes.core import window_spec
2222
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
23-
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
23+
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
2424
from bigframes.operations import aggregations as agg_ops
2525

2626
NULLARY_OP_REGISTRATION = reg.OpRegistration()

bigframes/core/compile/sqlglot/aggregations/unary_compiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
from bigframes.core import window_spec
2222
import bigframes.core.compile.sqlglot.aggregations.op_registration as reg
23-
from bigframes.core.compile.sqlglot.aggregations.utils import apply_window_if_present
23+
from bigframes.core.compile.sqlglot.aggregations.windows import apply_window_if_present
2424
import bigframes.core.compile.sqlglot.expressions.typed_expr as typed_expr
2525
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
2626
from bigframes.operations import aggregations as agg_ops

bigframes/core/compile/sqlglot/aggregations/utils.py

Lines changed: 0 additions & 29 deletions
This file was deleted.
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
from __future__ import annotations
15+
16+
import typing
17+
18+
import sqlglot.expressions as sge
19+
20+
from bigframes.core import utils, window_spec
21+
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
22+
import bigframes.core.ordering as ordering_spec
23+
24+
25+
def apply_window_if_present(
26+
value: sge.Expression,
27+
window: typing.Optional[window_spec.WindowSpec] = None,
28+
) -> sge.Expression:
29+
if window is None:
30+
return value
31+
32+
if window.is_row_bounded and not window.ordering:
33+
raise ValueError("No ordering provided for ordered analytic function")
34+
elif (
35+
not window.is_row_bounded
36+
and not window.is_range_bounded
37+
and not window.ordering
38+
):
39+
# Unbound grouping window.
40+
order_by = None
41+
elif window.is_range_bounded:
42+
# Note that, when the window is range-bounded, we only need one ordering key.
43+
# There are two reasons:
44+
# 1. Manipulating null positions requires more than one ordering key, which
45+
# is forbidden by SQL window syntax for range rolling.
46+
# 2. Pandas does not allow range rolling on timeseries with nulls.
47+
order_by = get_window_order_by((window.ordering[0],), override_null_order=False)
48+
else:
49+
order_by = get_window_order_by(window.ordering, override_null_order=True)
50+
51+
order = sge.Order(expressions=order_by) if order_by else None
52+
53+
group_by = (
54+
[scalar_compiler.compile_scalar_expression(key) for key in window.grouping_keys]
55+
if window.grouping_keys
56+
else None
57+
)
58+
59+
# This is the key change. Don't create a spec for the default window frame
60+
# if there's no ordering. This avoids generating an `ORDER BY NULL` clause.
61+
if not window.bounds and not order:
62+
return sge.Window(this=value, partition_by=group_by)
63+
64+
kind = (
65+
"ROWS" if isinstance(window.bounds, window_spec.RowsWindowBounds) else "RANGE"
66+
)
67+
68+
start: typing.Union[int, float, None] = None
69+
end: typing.Union[int, float, None] = None
70+
if isinstance(window.bounds, window_spec.RangeWindowBounds):
71+
if window.bounds.start is not None:
72+
start = utils.timedelta_to_micros(window.bounds.start)
73+
if window.bounds.end is not None:
74+
end = utils.timedelta_to_micros(window.bounds.end)
75+
elif window.bounds:
76+
start = window.bounds.start
77+
end = window.bounds.end
78+
79+
start_value, start_side = _get_window_bounds(start, is_preceding=True)
80+
end_value, end_side = _get_window_bounds(end, is_preceding=False)
81+
82+
spec = sge.WindowSpec(
83+
kind=kind,
84+
start=start_value,
85+
start_side=start_side,
86+
end=end_value,
87+
end_side=end_side,
88+
over="OVER",
89+
)
90+
91+
return sge.Window(this=value, partition_by=group_by, order=order, spec=spec)
92+
93+
94+
def get_window_order_by(
95+
ordering: typing.Tuple[ordering_spec.OrderingExpression, ...],
96+
override_null_order: bool = False,
97+
) -> typing.Optional[tuple[sge.Ordered, ...]]:
98+
"""Returns the SQL order by clause for a window specification."""
99+
if not ordering:
100+
return None
101+
102+
order_by = []
103+
for ordering_spec_item in ordering:
104+
expr = scalar_compiler.compile_scalar_expression(
105+
ordering_spec_item.scalar_expression
106+
)
107+
desc = not ordering_spec_item.direction.is_ascending
108+
nulls_first = not ordering_spec_item.na_last
109+
110+
if override_null_order:
111+
# Bigquery SQL considers NULLS to be "smallest" values, but we need
112+
# to override in these cases.
113+
is_null_expr = sge.Is(this=expr, expression=sge.Null())
114+
if nulls_first and desc:
115+
order_by.append(
116+
sge.Ordered(
117+
this=is_null_expr,
118+
desc=desc,
119+
nulls_first=nulls_first,
120+
)
121+
)
122+
elif not nulls_first and not desc:
123+
order_by.append(
124+
sge.Ordered(
125+
this=is_null_expr,
126+
desc=desc,
127+
nulls_first=nulls_first,
128+
)
129+
)
130+
131+
order_by.append(
132+
sge.Ordered(
133+
this=expr,
134+
desc=desc,
135+
nulls_first=nulls_first,
136+
)
137+
)
138+
return tuple(order_by)
139+
140+
141+
def _get_window_bounds(
142+
value, is_preceding: bool
143+
) -> tuple[typing.Union[str, sge.Expression], typing.Optional[str]]:
144+
"""Compiles a single boundary value into its SQL components."""
145+
if value is None:
146+
side = "PRECEDING" if is_preceding else "FOLLOWING"
147+
return "UNBOUNDED", side
148+
149+
if value == 0:
150+
return "CURRENT ROW", None
151+
152+
side = "PRECEDING" if value < 0 else "FOLLOWING"
153+
return sge.convert(abs(value)), side

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from bigframes.core import expression, guid, identifiers, nodes, pyarrow_utils, rewrite
2424
from bigframes.core.compile import configs
2525
import bigframes.core.compile.sqlglot.aggregate_compiler as aggregate_compiler
26+
from bigframes.core.compile.sqlglot.aggregations import windows
2627
from bigframes.core.compile.sqlglot.expressions import typed_expr
2728
import bigframes.core.compile.sqlglot.scalar_compiler as scalar_compiler
2829
import bigframes.core.compile.sqlglot.sqlglot_ir as ir
@@ -272,18 +273,16 @@ def compile_random_sample(
272273
def compile_aggregate(
273274
self, node: nodes.AggregateNode, child: ir.SQLGlotIR
274275
) -> ir.SQLGlotIR:
275-
ordering_cols = tuple(
276-
sge.Ordered(
277-
this=scalar_compiler.compile_scalar_expression(
278-
ordering.scalar_expression
279-
),
280-
desc=ordering.direction.is_ascending is False,
281-
nulls_first=ordering.na_last is False,
282-
)
283-
for ordering in node.order_by
276+
ordering_cols = windows.get_window_order_by(
277+
node.order_by, override_null_order=True
284278
)
285279
aggregations: tuple[tuple[str, sge.Expression], ...] = tuple(
286-
(id.sql, aggregate_compiler.compile_aggregate(agg, order_by=ordering_cols))
280+
(
281+
id.sql,
282+
aggregate_compiler.compile_aggregate(
283+
agg, order_by=ordering_cols if ordering_cols else ()
284+
),
285+
)
287286
for agg, id in node.aggregations
288287
)
289288
by_cols: tuple[sge.Expression, ...] = tuple(

samples/snippets/conftest.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Iterator
15+
from typing import Generator, Iterator
1616

17-
from google.cloud import bigquery
17+
from google.cloud import bigquery, storage
1818
import pytest
1919
import test_utils.prefixer
2020

@@ -42,11 +42,27 @@ def bigquery_client() -> bigquery.Client:
4242
return bigquery_client
4343

4444

45+
@pytest.fixture(scope="session")
46+
def storage_client(project_id: str) -> storage.Client:
47+
return storage.Client(project=project_id)
48+
49+
4550
@pytest.fixture(scope="session")
4651
def project_id(bigquery_client: bigquery.Client) -> str:
4752
return bigquery_client.project
4853

4954

55+
@pytest.fixture(scope="session")
56+
def gcs_bucket(storage_client: storage.Client) -> Generator[str, None, None]:
57+
bucket_name = "bigframes_blob_test_with_data_wipeout"
58+
59+
yield bucket_name
60+
61+
bucket = storage_client.get_bucket(bucket_name)
62+
for blob in bucket.list_blobs():
63+
blob.delete()
64+
65+
5066
@pytest.fixture(autouse=True)
5167
def reset_session() -> None:
5268
"""An autouse fixture ensuring each sample runs in a fresh session.
@@ -78,11 +94,6 @@ def dataset_id_eu(bigquery_client: bigquery.Client, project_id: str) -> Iterator
7894
bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
7995

8096

81-
@pytest.fixture(scope="session")
82-
def gcs_dst_bucket() -> str:
83-
return "gs://bigframes_blob_test"
84-
85-
8697
@pytest.fixture
8798
def random_model_id(
8899
bigquery_client: bigquery.Client, project_id: str, dataset_id: str

samples/snippets/multimodal_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# limitations under the License.
1414

1515

16-
def test_multimodal_dataframe(gcs_dst_bucket: str) -> None:
16+
def test_multimodal_dataframe(gcs_bucket: str) -> None:
1717
# destination folder must be in a GCS bucket that the BQ connection service account (default or user provided) has write access to.
18-
dst_bucket = gcs_dst_bucket
18+
dst_bucket = f"gs://{gcs_bucket}"
1919
# [START bigquery_dataframes_multimodal_dataframe_create]
2020
import bigframes
2121

samples/snippets/sessions_and_io_test.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
# limitations under the License.
1414

1515

16-
def test_sessions_and_io(project_id: str, dataset_id: str) -> None:
16+
def test_sessions_and_io(project_id: str, dataset_id: str, gcs_bucket: str) -> None:
1717
YOUR_PROJECT_ID = project_id
1818
YOUR_DATASET_ID = dataset_id
1919
YOUR_LOCATION = "us"
20+
YOUR_BUCKET = gcs_bucket
2021

2122
# [START bigquery_dataframes_create_and_use_session_instance]
2223
import bigframes
@@ -139,6 +140,15 @@ def test_sessions_and_io(project_id: str, dataset_id: str) -> None:
139140
# [END bigquery_dataframes_read_data_from_csv]
140141
assert df is not None
141142

143+
# [START bigquery_dataframes_write_data_to_csv]
144+
import bigframes.pandas as bpd
145+
146+
df = bpd.DataFrame({"my_col": [1, 2, 3]})
147+
# Write a dataframe to a CSV file in GCS
148+
df.to_csv(f"gs://{YOUR_BUCKET}/myfile*.csv")
149+
# [END bigquery_dataframes_write_data_to_csv]
150+
assert df is not None
151+
142152
# [START bigquery_dataframes_read_data_from_bigquery_table]
143153
import bigframes.pandas as bpd
144154

tests/system/small/blob/test_properties.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def test_blob_authorizer(images_mm_df: bpd.DataFrame, bq_connection: str):
4040

4141
def test_blob_version(images_mm_df: bpd.DataFrame):
4242
actual = images_mm_df["blob_col"].blob.version().to_pandas()
43-
expected = pd.Series(["1739574332294150", "1739574332271343"], name="version")
43+
expected = pd.Series(["1753907851152593", "1753907851111538"], name="version")
4444

4545
pd.testing.assert_series_equal(
4646
actual, expected, check_dtype=False, check_index_type=False
@@ -55,13 +55,13 @@ def test_blob_metadata(images_mm_df: bpd.DataFrame):
5555
'{"content_type":"image/jpeg",'
5656
'"md5_hash":"e130ad042261a1883cd2cc06831cf748",'
5757
'"size":338390,'
58-
'"updated":1739574332000000}'
58+
'"updated":1753907851000000}'
5959
),
6060
(
6161
'{"content_type":"image/jpeg",'
6262
'"md5_hash":"e2ae3191ff2b809fd0935f01a537c650",'
6363
'"size":43333,'
64-
'"updated":1739574332000000}'
64+
'"updated":1753907851000000}'
6565
),
6666
],
6767
name="metadata",
@@ -105,8 +105,8 @@ def test_blob_updated(images_mm_df: bpd.DataFrame):
105105
actual = images_mm_df["blob_col"].blob.updated().to_pandas()
106106
expected = pd.Series(
107107
[
108-
pd.Timestamp("2025-02-14 23:05:32", tz="UTC"),
109-
pd.Timestamp("2025-02-14 23:05:32", tz="UTC"),
108+
pd.Timestamp("2025-07-30 20:37:31", tz="UTC"),
109+
pd.Timestamp("2025-07-30 20:37:31", tz="UTC"),
110110
],
111111
name="updated",
112112
)

0 commit comments

Comments
 (0)