Skip to content

Commit 8fe7c9d

Browse files
Merge remote-tracking branch 'github/main' into grouped_value_counts
2 parents 952e446 + cd954ac commit 8fe7c9d

File tree

7 files changed

+293
-16
lines changed

7 files changed

+293
-16
lines changed

bigframes/core/compile/compiled.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,7 @@ def project_window_op(
459459
for column in inputs:
460460
clauses.append((column.isnull(), ibis_types.null()))
461461
if window_spec.min_periods and len(inputs) > 0:
462-
if expression.op.skips_nulls:
462+
if not expression.op.nulls_count_for_min_values:
463463
# Most operations do not count NULL values towards min_periods
464464
per_col_does_count = (column.notnull() for column in inputs)
465465
# All inputs must be non-null for observation to count

bigframes/core/groupby/dataframe_group_by.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,48 @@ def kurt(
263263

264264
kurtosis = kurt
265265

266+
@validations.requires_ordering()
267+
def first(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame:
268+
window_spec = window_specs.unbound(
269+
grouping_keys=tuple(self._by_col_ids),
270+
min_periods=min_count if min_count >= 0 else 0,
271+
)
272+
target_cols, index = self._aggregated_columns(numeric_only)
273+
block, firsts_ids = self._block.multi_apply_window_op(
274+
target_cols,
275+
agg_ops.FirstNonNullOp(),
276+
window_spec=window_spec,
277+
)
278+
block, _ = block.aggregate(
279+
self._by_col_ids,
280+
tuple(
281+
aggs.agg(firsts_id, agg_ops.AnyValueOp()) for firsts_id in firsts_ids
282+
),
283+
dropna=self._dropna,
284+
column_labels=index,
285+
)
286+
return df.DataFrame(block)
287+
288+
@validations.requires_ordering()
289+
def last(self, numeric_only: bool = False, min_count: int = -1) -> df.DataFrame:
290+
window_spec = window_specs.unbound(
291+
grouping_keys=tuple(self._by_col_ids),
292+
min_periods=min_count if min_count >= 0 else 0,
293+
)
294+
target_cols, index = self._aggregated_columns(numeric_only)
295+
block, lasts_ids = self._block.multi_apply_window_op(
296+
target_cols,
297+
agg_ops.LastNonNullOp(),
298+
window_spec=window_spec,
299+
)
300+
block, _ = block.aggregate(
301+
self._by_col_ids,
302+
tuple(aggs.agg(lasts_id, agg_ops.AnyValueOp()) for lasts_id in lasts_ids),
303+
dropna=self._dropna,
304+
column_labels=index,
305+
)
306+
return df.DataFrame(block)
307+
266308
def all(self) -> df.DataFrame:
267309
return self._aggregate_all(agg_ops.all_op)
268310

bigframes/core/groupby/series_group_by.py

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import bigframes.core.window as windows
3737
import bigframes.core.window_spec as window_specs
3838
import bigframes.dataframe as df
39+
import bigframes.dtypes
3940
import bigframes.operations.aggregations as agg_ops
4041
import bigframes.series as series
4142

@@ -162,6 +163,54 @@ def kurt(self, *args, **kwargs) -> series.Series:
162163

163164
kurtosis = kurt
164165

166+
@validations.requires_ordering()
167+
def first(self, numeric_only: bool = False, min_count: int = -1) -> series.Series:
168+
if numeric_only and not bigframes.dtypes.is_numeric(
169+
self._block.expr.get_column_type(self._value_column)
170+
):
171+
raise TypeError(
172+
f"Cannot use 'numeric_only' with non-numeric column {self._value_name}."
173+
)
174+
window_spec = window_specs.unbound(
175+
grouping_keys=tuple(self._by_col_ids),
176+
min_periods=min_count if min_count >= 0 else 0,
177+
)
178+
block, firsts_id = self._block.apply_window_op(
179+
self._value_column,
180+
agg_ops.FirstNonNullOp(),
181+
window_spec=window_spec,
182+
)
183+
block, _ = block.aggregate(
184+
self._by_col_ids,
185+
(aggs.agg(firsts_id, agg_ops.AnyValueOp()),),
186+
dropna=self._dropna,
187+
)
188+
return series.Series(block.with_column_labels([self._value_name]))
189+
190+
@validations.requires_ordering()
191+
def last(self, numeric_only: bool = False, min_count: int = -1) -> series.Series:
192+
if numeric_only and not bigframes.dtypes.is_numeric(
193+
self._block.expr.get_column_type(self._value_column)
194+
):
195+
raise TypeError(
196+
f"Cannot use 'numeric_only' with non-numeric column {self._value_name}."
197+
)
198+
window_spec = window_specs.unbound(
199+
grouping_keys=tuple(self._by_col_ids),
200+
min_periods=min_count if min_count >= 0 else 0,
201+
)
202+
block, firsts_id = self._block.apply_window_op(
203+
self._value_column,
204+
agg_ops.LastNonNullOp(),
205+
window_spec=window_spec,
206+
)
207+
block, _ = block.aggregate(
208+
self._by_col_ids,
209+
(aggs.agg(firsts_id, agg_ops.AnyValueOp()),),
210+
dropna=self._dropna,
211+
)
212+
return series.Series(block.with_column_labels([self._value_name]))
213+
165214
def prod(self, *args) -> series.Series:
166215
return self._aggregate(agg_ops.product_op)
167216

@@ -338,7 +387,7 @@ def _apply_window_op(
338387
discard_name=False,
339388
window: typing.Optional[window_specs.WindowSpec] = None,
340389
never_skip_nulls: bool = False,
341-
):
390+
) -> series.Series:
342391
"""Apply window op to groupby. Defaults to grouped cumulative window."""
343392
window_spec = window or window_specs.cumulative_rows(
344393
grouping_keys=tuple(self._by_col_ids)

bigframes/functions/_function_client.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -366,8 +366,8 @@ def generate_cloud_function_code(
366366
def create_cloud_function(
367367
self,
368368
def_,
369-
cf_name,
370369
*,
370+
random_name,
371371
input_types: Tuple[str],
372372
output_type: str,
373373
package_requirements=None,
@@ -428,9 +428,9 @@ def create_cloud_function(
428428
create_function_request.parent = (
429429
self.get_cloud_function_fully_qualified_parent()
430430
)
431-
create_function_request.function_id = cf_name
431+
create_function_request.function_id = random_name
432432
function = functions_v2.Function()
433-
function.name = self.get_cloud_function_fully_qualified_name(cf_name)
433+
function.name = self.get_cloud_function_fully_qualified_name(random_name)
434434
function.build_config = functions_v2.BuildConfig()
435435
function.build_config.runtime = python_version
436436
function.build_config.entry_point = entry_point
@@ -497,24 +497,25 @@ def create_cloud_function(
497497
# Cleanup
498498
os.remove(archive_path)
499499
except google.api_core.exceptions.AlreadyExists:
500-
# If a cloud function with the same name already exists, let's
501-
# update it
502-
update_function_request = functions_v2.UpdateFunctionRequest()
503-
update_function_request.function = function
504-
operation = self._cloud_functions_client.update_function(
505-
request=update_function_request
506-
)
507-
operation.result()
500+
# b/437124912: The most likely scenario is that
501+
# `create_function` had a retry due to a network issue. The
502+
# retried request then fails because the first call actually
503+
# succeeded, but we didn't get the successful response back.
504+
#
505+
# Since the function name was randomly chosen to avoid
506+
# conflicts, we know the AlreadyExist can only happen because
507+
# we created it. This error is safe to ignore.
508+
pass
508509

509510
# Fetch the endpoint of the just created function
510-
endpoint = self.get_cloud_function_endpoint(cf_name)
511+
endpoint = self.get_cloud_function_endpoint(random_name)
511512
if not endpoint:
512513
raise bf_formatting.create_exception_with_feedback_link(
513514
ValueError, "Couldn't fetch the http endpoint."
514515
)
515516

516517
logger.info(
517-
f"Successfully created cloud function {cf_name} with uri ({endpoint})"
518+
f"Successfully created cloud function {random_name} with uri ({endpoint})"
518519
)
519520
return endpoint
520521

@@ -571,7 +572,7 @@ def provision_bq_remote_function(
571572
if not cf_endpoint:
572573
cf_endpoint = self.create_cloud_function(
573574
def_,
574-
cloud_function_name,
575+
random_name=cloud_function_name,
575576
input_types=input_types,
576577
output_type=output_type,
577578
package_requirements=package_requirements,

bigframes/operations/aggregations.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,11 @@ def skips_nulls(self):
3333
"""Whether the window op skips null rows."""
3434
return True
3535

36+
@property
37+
def nulls_count_for_min_values(self) -> bool:
38+
"""Whether null values count for min_values."""
39+
return not self.skips_nulls
40+
3641
@property
3742
def implicitly_inherits_order(self):
3843
"""
@@ -480,6 +485,10 @@ class FirstNonNullOp(UnaryWindowOp):
480485
def skips_nulls(self):
481486
return False
482487

488+
@property
489+
def nulls_count_for_min_values(self) -> bool:
490+
return False
491+
483492

484493
@dataclasses.dataclass(frozen=True)
485494
class LastOp(UnaryWindowOp):
@@ -492,6 +501,10 @@ class LastNonNullOp(UnaryWindowOp):
492501
def skips_nulls(self):
493502
return False
494503

504+
@property
505+
def nulls_count_for_min_values(self) -> bool:
506+
return False
507+
495508

496509
@dataclasses.dataclass(frozen=True)
497510
class ShiftOp(UnaryWindowOp):

tests/system/small/test_groupby.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -622,6 +622,59 @@ def test_dataframe_groupby_value_counts(
622622
pd.testing.assert_frame_equal(pd_result, bf_result, check_dtype=False)
623623

624624

625+
@pytest.mark.parametrize(
626+
("numeric_only", "min_count"),
627+
[
628+
(False, 4),
629+
(True, 0),
630+
],
631+
)
632+
def test_dataframe_groupby_first(
633+
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
634+
):
635+
# min_count seems to not work properly on older pandas
636+
pytest.importorskip("pandas", minversion="2.0.0")
637+
# bytes, dates not handling min_count properly in pandas
638+
bf_result = (
639+
scalars_df_index.drop(columns=["bytes_col", "date_col"])
640+
.groupby(scalars_df_index.int64_col % 2)
641+
.first(numeric_only=numeric_only, min_count=min_count)
642+
).to_pandas()
643+
pd_result = (
644+
scalars_pandas_df_index.drop(columns=["bytes_col", "date_col"])
645+
.groupby(scalars_pandas_df_index.int64_col % 2)
646+
.first(numeric_only=numeric_only, min_count=min_count)
647+
)
648+
pd.testing.assert_frame_equal(
649+
pd_result,
650+
bf_result,
651+
)
652+
653+
654+
@pytest.mark.parametrize(
655+
("numeric_only", "min_count"),
656+
[
657+
(True, 2),
658+
(False, -1),
659+
],
660+
)
661+
def test_dataframe_groupby_last(
662+
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
663+
):
664+
bf_result = (
665+
scalars_df_index.groupby(scalars_df_index.int64_col % 2).last(
666+
numeric_only=numeric_only, min_count=min_count
667+
)
668+
).to_pandas()
669+
pd_result = scalars_pandas_df_index.groupby(
670+
scalars_pandas_df_index.int64_col % 2
671+
).last(numeric_only=numeric_only, min_count=min_count)
672+
pd.testing.assert_frame_equal(
673+
pd_result,
674+
bf_result,
675+
)
676+
677+
625678
# ==============
626679
# Series.groupby
627680
# ==============
@@ -841,3 +894,48 @@ def test_series_groupby_value_counts(
841894
normalize=normalize, ascending=ascending, dropna=dropna
842895
)
843896
pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False)
897+
898+
899+
@pytest.mark.parametrize(
900+
("numeric_only", "min_count"),
901+
[
902+
(True, 2),
903+
(False, -1),
904+
],
905+
)
906+
def test_series_groupby_first(
907+
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
908+
):
909+
bf_result = (
910+
scalars_df_index.groupby("string_col")["int64_col"].first(
911+
numeric_only=numeric_only, min_count=min_count
912+
)
913+
).to_pandas()
914+
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].first(
915+
numeric_only=numeric_only, min_count=min_count
916+
)
917+
pd.testing.assert_series_equal(
918+
pd_result,
919+
bf_result,
920+
)
921+
922+
923+
@pytest.mark.parametrize(
924+
("numeric_only", "min_count"),
925+
[
926+
(False, 4),
927+
(True, 0),
928+
],
929+
)
930+
def test_series_groupby_last(
931+
scalars_df_index, scalars_pandas_df_index, numeric_only, min_count
932+
):
933+
bf_result = (
934+
scalars_df_index.groupby("string_col")["int64_col"].last(
935+
numeric_only=numeric_only, min_count=min_count
936+
)
937+
).to_pandas()
938+
pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].last(
939+
numeric_only=numeric_only, min_count=min_count
940+
)
941+
pd.testing.assert_series_equal(pd_result, bf_result)

0 commit comments

Comments
 (0)