Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add DataFrame.pipe() method #421

Merged
merged 10 commits into from
Mar 14, 2024
25 changes: 25 additions & 0 deletions tests/system/small/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,31 @@ def test_apply_series_scalar_callable(
pandas.testing.assert_series_equal(bf_result, pd_result)


def test_df_pipe(
scalars_df_index,
scalars_pandas_df_index,
):
columns = ["int64_too", "int64_col"]

def foo(x: int, y: int, df):
return (df + x) % y

bf_result = (
scalars_df_index[columns]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
.to_pandas()
)

pd_result = (
scalars_pandas_df_index[columns]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
)

pandas.testing.assert_frame_equal(bf_result, pd_result)


def test_df_keys(
scalars_df_index,
scalars_pandas_df_index,
Expand Down
25 changes: 25 additions & 0 deletions tests/system/small/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -3203,3 +3203,28 @@ def test_apply_not_supported(scalars_dfs, col, lambda_, exception):
bf_col = scalars_df[col]
with pytest.raises(exception):
bf_col.apply(lambda_, by_row=False)


def test_series_pipe(
scalars_df_index,
scalars_pandas_df_index,
):
column = "int64_too"

def foo(x: int, y: int, df):
return (df + x) % y

bf_result = (
scalars_df_index[column]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
.to_pandas()
)

pd_result = (
scalars_pandas_df_index[column]
.pipe((foo, "df"), x=7, y=9)
.pipe(lambda x: x**2)
)

assert_series_equal(bf_result, pd_result)
42 changes: 42 additions & 0 deletions third_party/bigframes_vendored/pandas/core/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/common.py
from __future__ import annotations

from typing import Callable, TYPE_CHECKING

if TYPE_CHECKING:
from bigframes_vendored.pandas.pandas._typing import T


def pipe(
obj, func: Callable[..., T] | tuple[Callable[..., T], str], *args, **kwargs
) -> T:
"""
Apply a function ``func`` to object ``obj`` either by passing obj as the
first argument to the function or, in the case that the func is a tuple,
interpret the first element of the tuple as a function and pass the obj to
that function as a keyword argument whose key is the value of the second
element of the tuple.

Args:
func (callable or tuple of (callable, str)):
Function to apply to this object or, alternatively, a
``(callable, data_keyword)`` tuple where ``data_keyword`` is a
string indicating the keyword of ``callable`` that expects the
object.
args (iterable, optional):
Positional arguments passed into ``func``.
kwargs (dict, optional):
A dictionary of keyword arguments passed into ``func``.

Returns:
object: the return type of ``func``.
"""
if isinstance(func, tuple):
func, target = func
if target in kwargs:
msg = f"{target} is both the pipe target and a keyword argument"
raise ValueError(msg)
kwargs[target] = obj
return func(*args, **kwargs)
else:
return func(obj, *args, **kwargs)
105 changes: 104 additions & 1 deletion third_party/bigframes_vendored/pandas/core/generic.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
# Contains code from https://github.com/pandas-dev/pandas/blob/main/pandas/core/generic.py
from __future__ import annotations

from typing import Iterator, Literal, Optional
from typing import Callable, Iterator, Literal, Optional, TYPE_CHECKING

from bigframes_vendored.pandas.core import indexing
import bigframes_vendored.pandas.core.common as common

from bigframes import constants

if TYPE_CHECKING:
from bigframes_vendored.pandas.pandas._typing import T


class NDFrame(indexing.IndexingMixin):
"""
Expand Down Expand Up @@ -963,6 +967,105 @@ def expanding(self, min_periods=1):
"""
raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE)

def pipe(
self,
func: Callable[..., T] | tuple[Callable[..., T], str],
*args,
**kwargs,
) -> T:
"""
Apply chainable functions that expect Series or DataFrames.

**Examples:**

Constructing a income DataFrame from a dictionary.

>>> import bigframes.pandas as bpd
>>> import numpy as np
>>> bpd.options.display.progress_bar = None

>>> data = [[8000, 1000], [9500, np.nan], [5000, 2000]]
>>> df = bpd.DataFrame(data, columns=['Salary', 'Others'])
>>> df
Salary Others
0 8000 1000.0
1 9500 <NA>
2 5000 2000.0
<BLANKLINE>
[3 rows x 2 columns]

Functions that perform tax reductions on an income DataFrame.

>>> def subtract_federal_tax(df):
... return df * 0.9
>>> def subtract_state_tax(df, rate):
... return df * (1 - rate)
>>> def subtract_national_insurance(df, rate, rate_increase):
... new_rate = rate + rate_increase
... return df * (1 - new_rate)

Instead of writing

>>> subtract_national_insurance(
... subtract_state_tax(subtract_federal_tax(df), rate=0.12),
... rate=0.05,
... rate_increase=0.02) # doctest: +SKIP

You can write

>>> (
... df.pipe(subtract_federal_tax)
... .pipe(subtract_state_tax, rate=0.12)
... .pipe(subtract_national_insurance, rate=0.05, rate_increase=0.02)
... )
Salary Others
0 5892.48 736.56
1 6997.32 <NA>
2 3682.8 1473.12
<BLANKLINE>
[3 rows x 2 columns]

If you have a function that takes the data as (say) the second
argument, pass a tuple indicating which keyword expects the
data. For example, suppose ``national_insurance`` takes its data as ``df``
in the second argument:

>>> def subtract_national_insurance(rate, df, rate_increase):
... new_rate = rate + rate_increase
... return df * (1 - new_rate)
>>> (
... df.pipe(subtract_federal_tax)
... .pipe(subtract_state_tax, rate=0.12)
... .pipe(
... (subtract_national_insurance, 'df'),
... rate=0.05,
... rate_increase=0.02
... )
... )
Salary Others
0 5892.48 736.56
1 6997.32 <NA>
2 3682.8 1473.12
<BLANKLINE>
[3 rows x 2 columns]

Args:
func (function):
Function to apply to this object.
``args``, and ``kwargs`` are passed into ``func``.
Alternatively a ``(callable, data_keyword)`` tuple where
``data_keyword`` is a string indicating the keyword of
``callable`` that expects this object.
args (iterable, optional):
Positional arguments passed into ``func``.
kwargs (mapping, optional):
A dictionary of keyword arguments passed into ``func``.

Returns:
same type as caller
"""
return common.pipe(self, func, *args, **kwargs)

def __nonzero__(self):
raise ValueError(
f"The truth value of a {type(self).__name__} is ambiguous. "
Expand Down