Skip to content

Commit 95a005c

Browse files
authored
Merge branch 'main' into sycai_precision_score_binary
2 parents 2904b9d + 3b46a0d commit 95a005c

File tree

23 files changed

+665
-160
lines changed

23 files changed

+665
-160
lines changed

bigframes/bigquery/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
json_value,
5151
json_value_array,
5252
parse_json,
53+
to_json_string,
5354
)
5455
from bigframes.bigquery._operations.search import create_vector_index, vector_search
5556
from bigframes.bigquery._operations.sql import sql_scalar
@@ -87,6 +88,7 @@
8788
json_value,
8889
json_value_array,
8990
parse_json,
91+
to_json_string,
9092
# search ops
9193
create_vector_index,
9294
vector_search,

bigframes/bigquery/_operations/json.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,40 @@ def json_value_array(
430430
return input._apply_unary_op(ops.JSONValueArray(json_path=json_path))
431431

432432

433+
def to_json_string(
434+
input: series.Series,
435+
) -> series.Series:
436+
"""Converts a series to a JSON-formatted STRING value.
437+
438+
**Examples:**
439+
440+
>>> import bigframes.pandas as bpd
441+
>>> import bigframes.bigquery as bbq
442+
>>> bpd.options.display.progress_bar = None
443+
444+
>>> s = bpd.Series([1, 2, 3])
445+
>>> bbq.to_json_string(s)
446+
0 1
447+
1 2
448+
2 3
449+
dtype: string
450+
451+
>>> s = bpd.Series([{"int": 1, "str": "pandas"}, {"int": 2, "str": "numpy"}])
452+
>>> bbq.to_json_string(s)
453+
0 {"int":1,"str":"pandas"}
454+
1 {"int":2,"str":"numpy"}
455+
dtype: string
456+
457+
Args:
458+
input (bigframes.series.Series):
459+
The Series to be converted.
460+
461+
Returns:
462+
bigframes.series.Series: A new Series with the JSON-formatted STRING value.
463+
"""
464+
return input._apply_unary_op(ops.ToJSONString())
465+
466+
433467
@utils.preview(name="The JSON-related API `parse_json`")
434468
def parse_json(
435469
input: series.Series,

bigframes/core/blocks.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
Optional,
3838
Sequence,
3939
Tuple,
40+
TYPE_CHECKING,
4041
Union,
4142
)
4243
import warnings
@@ -69,6 +70,9 @@
6970
from bigframes.session import dry_runs, execution_spec
7071
from bigframes.session import executor as executors
7172

73+
if TYPE_CHECKING:
74+
from bigframes.session.executor import ExecuteResult
75+
7276
# Type constraint for wherever column labels are used
7377
Label = typing.Hashable
7478

@@ -404,13 +408,15 @@ def reset_index(
404408
col_level: Union[str, int] = 0,
405409
col_fill: typing.Hashable = "",
406410
allow_duplicates: bool = False,
411+
replacement: Optional[bigframes.enums.DefaultIndexKind] = None,
407412
) -> Block:
408413
"""Reset the index of the block, promoting the old index to a value column.
409414
410415
Arguments:
411416
level: the label or index level of the index levels to remove.
412417
name: this is the column id for the new value id derived from the old index
413-
allow_duplicates:
418+
allow_duplicates: if false, duplicate col labels will result in error
419+
replacement: if not null, will override default index replacement type
414420
415421
Returns:
416422
A new Block because dropping index columns can break references
@@ -425,23 +431,19 @@ def reset_index(
425431
level_ids = self.index_columns
426432

427433
expr = self._expr
434+
replacement_idx_type = replacement or self.session._default_index_type
428435
if set(self.index_columns) > set(level_ids):
429436
new_index_cols = [col for col in self.index_columns if col not in level_ids]
430437
new_index_labels = [self.col_id_to_index_name[id] for id in new_index_cols]
431-
elif (
432-
self.session._default_index_type
433-
== bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
434-
):
438+
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64:
435439
expr, new_index_col_id = expr.promote_offsets()
436440
new_index_cols = [new_index_col_id]
437441
new_index_labels = [None]
438-
elif self.session._default_index_type == bigframes.enums.DefaultIndexKind.NULL:
442+
elif replacement_idx_type == bigframes.enums.DefaultIndexKind.NULL:
439443
new_index_cols = []
440444
new_index_labels = []
441445
else:
442-
raise ValueError(
443-
f"Unrecognized default index kind: {self.session._default_index_type}"
444-
)
446+
raise ValueError(f"Unrecognized default index kind: {replacement_idx_type}")
445447

446448
if drop:
447449
# Even though the index might be part of the ordering, keep that
@@ -630,15 +632,17 @@ def to_pandas(
630632
max_download_size, sampling_method, random_state
631633
)
632634

633-
df, query_job = self._materialize_local(
635+
ex_result = self._materialize_local(
634636
materialize_options=MaterializationOptions(
635637
downsampling=sampling,
636638
allow_large_results=allow_large_results,
637639
ordered=ordered,
638640
)
639641
)
642+
df = ex_result.to_pandas()
643+
df = self._copy_index_to_pandas(df)
640644
df.set_axis(self.column_labels, axis=1, copy=False)
641-
return df, query_job
645+
return df, ex_result.query_job
642646

643647
def _get_sampling_option(
644648
self,
@@ -746,7 +750,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame) -> pd.DataFrame:
746750

747751
def _materialize_local(
748752
self, materialize_options: MaterializationOptions = MaterializationOptions()
749-
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
753+
) -> ExecuteResult:
750754
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
751755
# TODO(swast): Allow for dry run and timeout.
752756
under_10gb = (
@@ -815,8 +819,7 @@ def _materialize_local(
815819
MaterializationOptions(ordered=materialize_options.ordered)
816820
)
817821
else:
818-
df = execute_result.to_pandas()
819-
return self._copy_index_to_pandas(df), execute_result.query_job
822+
return execute_result
820823

821824
def _downsample(
822825
self, total_rows: int, sampling_method: str, fraction: float, random_state

bigframes/core/compile/ibis_compiler/scalar_op_registry.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2068,9 +2068,7 @@ def json_extract_string_array( # type: ignore[empty-body]
20682068

20692069

20702070
@ibis_udf.scalar.builtin(name="to_json_string")
2071-
def to_json_string( # type: ignore[empty-body]
2072-
value,
2073-
) -> ibis_dtypes.String:
2071+
def to_json_string(value) -> ibis_dtypes.String: # type: ignore[empty-body]
20742072
"""Convert value to JSON-formatted string."""
20752073

20762074

bigframes/core/compile/sqlglot/compiler.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,28 @@ def compile_join(
244244
joins_nulls=node.joins_nulls,
245245
)
246246

247+
@_compile_node.register
248+
def compile_isin_join(
249+
self, node: nodes.InNode, left: ir.SQLGlotIR, right: ir.SQLGlotIR
250+
) -> ir.SQLGlotIR:
251+
conditions = (
252+
typed_expr.TypedExpr(
253+
scalar_compiler.compile_scalar_expression(node.left_col),
254+
node.left_col.output_type,
255+
),
256+
typed_expr.TypedExpr(
257+
scalar_compiler.compile_scalar_expression(node.right_col),
258+
node.right_col.output_type,
259+
),
260+
)
261+
262+
return left.isin_join(
263+
right,
264+
indicator_col=node.indicator_col.sql,
265+
conditions=conditions,
266+
joins_nulls=node.joins_nulls,
267+
)
268+
247269
@_compile_node.register
248270
def compile_concat(
249271
self, node: nodes.ConcatNode, *children: ir.SQLGlotIR

bigframes/core/compile/sqlglot/sqlglot_ir.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,68 @@ def join(
336336

337337
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
338338

339+
def isin_join(
340+
self,
341+
right: SQLGlotIR,
342+
indicator_col: str,
343+
conditions: tuple[typed_expr.TypedExpr, typed_expr.TypedExpr],
344+
joins_nulls: bool = True,
345+
) -> SQLGlotIR:
346+
"""Joins the current query with another SQLGlotIR instance."""
347+
left_cte_name = sge.to_identifier(
348+
next(self.uid_gen.get_uid_stream("bfcte_")), quoted=self.quoted
349+
)
350+
351+
left_select = _select_to_cte(self.expr, left_cte_name)
352+
# Prefer subquery over CTE for the IN clause's right side to improve SQL readability.
353+
right_select = right.expr
354+
355+
left_ctes = left_select.args.pop("with", [])
356+
right_ctes = right_select.args.pop("with", [])
357+
merged_ctes = [*left_ctes, *right_ctes]
358+
359+
left_condition = typed_expr.TypedExpr(
360+
sge.Column(this=conditions[0].expr, table=left_cte_name),
361+
conditions[0].dtype,
362+
)
363+
364+
new_column: sge.Expression
365+
if joins_nulls:
366+
right_table_name = sge.to_identifier(
367+
next(self.uid_gen.get_uid_stream("bft_")), quoted=self.quoted
368+
)
369+
right_condition = typed_expr.TypedExpr(
370+
sge.Column(this=conditions[1].expr, table=right_table_name),
371+
conditions[1].dtype,
372+
)
373+
new_column = sge.Exists(
374+
this=sge.Select()
375+
.select(sge.convert(1))
376+
.from_(sge.Alias(this=right_select.subquery(), alias=right_table_name))
377+
.where(
378+
_join_condition(left_condition, right_condition, joins_nulls=True)
379+
)
380+
)
381+
else:
382+
new_column = sge.In(
383+
this=left_condition.expr,
384+
expressions=[right_select.subquery()],
385+
)
386+
387+
new_column = sge.Alias(
388+
this=new_column,
389+
alias=sge.to_identifier(indicator_col, quoted=self.quoted),
390+
)
391+
392+
new_expr = (
393+
sge.Select()
394+
.select(sge.Column(this=sge.Star(), table=left_cte_name), new_column)
395+
.from_(sge.Table(this=left_cte_name))
396+
)
397+
new_expr.set("with", sge.With(expressions=merged_ctes))
398+
399+
return SQLGlotIR(expr=new_expr, uid_gen=self.uid_gen)
400+
339401
def explode(
340402
self,
341403
column_names: tuple[str, ...],

0 commit comments

Comments
 (0)