Skip to content

Commit bfd49a5

Browse files
authored
refactor: make to_pandas() call to_arrow() and use local dtypes in DataFrame construction (#132)
Towards internal issue 280662868 🦕
1 parent ade505c commit bfd49a5

File tree

9 files changed

+457
-91
lines changed

9 files changed

+457
-91
lines changed

bigframes/core/blocks.py

+7-34
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,8 @@
2828
from typing import Iterable, List, Optional, Sequence, Tuple
2929
import warnings
3030

31-
import geopandas as gpd # type: ignore
3231
import google.cloud.bigquery as bigquery
33-
import numpy
3432
import pandas as pd
35-
import pyarrow as pa # type: ignore
3633

3734
import bigframes.constants as constants
3835
import bigframes.core as core
@@ -46,6 +43,7 @@
4643
import bigframes.dtypes
4744
import bigframes.operations as ops
4845
import bigframes.operations.aggregations as agg_ops
46+
import bigframes.session._io.pandas
4947
import third_party.bigframes_vendored.pandas.io.common as vendored_pandas_io_common
5048

5149
# Type constraint for wherever column labels are used
@@ -372,34 +370,11 @@ def reorder_levels(self, ids: typing.Sequence[str]):
372370
level_names = [self.col_id_to_index_name[index_id] for index_id in ids]
373371
return Block(self.expr, ids, self.column_labels, level_names)
374372

375-
@classmethod
376-
def _to_dataframe(
377-
cls, result, schema: typing.Mapping[str, bigframes.dtypes.Dtype]
378-
) -> pd.DataFrame:
373+
def _to_dataframe(self, result) -> pd.DataFrame:
379374
"""Convert BigQuery data to pandas DataFrame with specific dtypes."""
380-
dtypes = bigframes.dtypes.to_pandas_dtypes_overrides(result.schema)
381-
df = result.to_dataframe(
382-
dtypes=dtypes,
383-
bool_dtype=pd.BooleanDtype(),
384-
int_dtype=pd.Int64Dtype(),
385-
float_dtype=pd.Float64Dtype(),
386-
string_dtype=pd.StringDtype(storage="pyarrow"),
387-
date_dtype=pd.ArrowDtype(pa.date32()),
388-
datetime_dtype=pd.ArrowDtype(pa.timestamp("us")),
389-
time_dtype=pd.ArrowDtype(pa.time64("us")),
390-
timestamp_dtype=pd.ArrowDtype(pa.timestamp("us", tz="UTC")),
391-
)
392-
393-
# Convert Geography column from StringDType to GeometryDtype.
394-
for column_name, dtype in schema.items():
395-
if dtype == gpd.array.GeometryDtype():
396-
df[column_name] = gpd.GeoSeries.from_wkt(
397-
# https://github.com/geopandas/geopandas/issues/1879
398-
df[column_name].replace({numpy.nan: None}),
399-
# BigQuery geography type is based on the WGS84 reference ellipsoid.
400-
crs="EPSG:4326",
401-
)
402-
return df
375+
dtypes = dict(zip(self.index_columns, self.index_dtypes))
376+
dtypes.update(zip(self.value_columns, self.dtypes))
377+
return self._expr._session._rows_to_dataframe(result, dtypes)
403378

404379
def to_pandas(
405380
self,
@@ -480,8 +455,7 @@ def _compute_and_count(
480455
if sampling_method == _HEAD:
481456
total_rows = int(results_iterator.total_rows * fraction)
482457
results_iterator.max_results = total_rows
483-
schema = dict(zip(self.value_columns, self.dtypes))
484-
df = self._to_dataframe(results_iterator, schema)
458+
df = self._to_dataframe(results_iterator)
485459

486460
if self.index_columns:
487461
df.set_index(list(self.index_columns), inplace=True)
@@ -510,8 +484,7 @@ def _compute_and_count(
510484
)
511485
else:
512486
total_rows = results_iterator.total_rows
513-
schema = dict(zip(self.value_columns, self.dtypes))
514-
df = self._to_dataframe(results_iterator, schema)
487+
df = self._to_dataframe(results_iterator)
515488

516489
if self.index_columns:
517490
df.set_index(list(self.index_columns), inplace=True)

bigframes/core/indexes/index.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -399,9 +399,10 @@ def to_pandas(self) -> pandas.Index:
399399
"""Executes deferred operations and downloads the results."""
400400
# Project down to only the index column. So the query can be cached to visualize other data.
401401
index_columns = list(self._block.index_columns)
402+
dtypes = dict(zip(index_columns, self.dtypes))
402403
expr = self._expr.select_columns(index_columns)
403404
results, _ = expr.start_query()
404-
df = expr._session._rows_to_dataframe(results)
405+
df = expr._session._rows_to_dataframe(results, dtypes)
405406
df = df.set_index(index_columns)
406407
index = df.index
407408
index.names = list(self._block._index_labels)

bigframes/dtypes.py

+6
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,10 @@ def ibis_dtype_to_bigframes_dtype(
169169
if isinstance(ibis_dtype, ibis_dtypes.Struct):
170170
return pd.ArrowDtype(ibis_dtype_to_arrow_dtype(ibis_dtype))
171171

172+
# BigQuery only supports integers of size 64 bits.
173+
if isinstance(ibis_dtype, ibis_dtypes.Integer):
174+
return pd.Int64Dtype()
175+
172176
if ibis_dtype in IBIS_TO_BIGFRAMES:
173177
return IBIS_TO_BIGFRAMES[ibis_dtype]
174178
elif isinstance(ibis_dtype, ibis_dtypes.Null):
@@ -372,6 +376,8 @@ def cast_ibis_value(
372376
ibis_dtypes.float64: (ibis_dtypes.string, ibis_dtypes.int64),
373377
ibis_dtypes.string: (ibis_dtypes.int64, ibis_dtypes.float64),
374378
ibis_dtypes.date: (),
379+
ibis_dtypes.Decimal(precision=38, scale=9): (ibis_dtypes.float64,),
380+
ibis_dtypes.Decimal(precision=76, scale=38): (ibis_dtypes.float64,),
375381
ibis_dtypes.time: (),
376382
ibis_dtypes.timestamp: (ibis_dtypes.Timestamp(timezone="UTC"),),
377383
ibis_dtypes.Timestamp(timezone="UTC"): (ibis_dtypes.timestamp,),

bigframes/session/__init__.py

+3-7
Original file line numberDiff line numberDiff line change
@@ -1515,14 +1515,10 @@ def _get_table_size(self, destination_table):
15151515
return table.num_bytes
15161516

15171517
def _rows_to_dataframe(
1518-
self, row_iterator: bigquery.table.RowIterator
1518+
self, row_iterator: bigquery.table.RowIterator, dtypes: Dict
15191519
) -> pandas.DataFrame:
1520-
return row_iterator.to_dataframe(
1521-
bool_dtype=pandas.BooleanDtype(),
1522-
int_dtype=pandas.Int64Dtype(),
1523-
float_dtype=pandas.Float64Dtype(),
1524-
string_dtype=pandas.StringDtype(storage="pyarrow"),
1525-
)
1520+
arrow_table = row_iterator.to_arrow()
1521+
return bigframes.session._io.pandas.arrow_to_pandas(arrow_table, dtypes)
15261522

15271523
def _start_generic_job(self, job: formatting_helpers.GenericJob):
15281524
if bigframes.options.display.progress_bar is not None:

bigframes/session/_io/pandas.py

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import Dict, Union
16+
17+
import geopandas # type: ignore
18+
import pandas
19+
import pandas.arrays
20+
import pyarrow # type: ignore
21+
import pyarrow.compute # type: ignore
22+
23+
import bigframes.constants
24+
25+
26+
def arrow_to_pandas(
27+
arrow_table: Union[pyarrow.Table, pyarrow.RecordBatch], dtypes: Dict
28+
):
29+
if len(dtypes) != arrow_table.num_columns:
30+
raise ValueError(
31+
f"Number of types {len(dtypes)} doesn't match number of columns "
32+
f"{arrow_table.num_columns}. {bigframes.constants.FEEDBACK_LINK}"
33+
)
34+
35+
serieses = {}
36+
for field, column in zip(arrow_table.schema, arrow_table):
37+
dtype = dtypes[field.name]
38+
39+
if dtype == geopandas.array.GeometryDtype():
40+
series = geopandas.GeoSeries.from_wkt(
41+
column,
42+
# BigQuery geography type is based on the WGS84 reference ellipsoid.
43+
crs="EPSG:4326",
44+
)
45+
elif dtype == pandas.Float64Dtype():
46+
# Preserve NA/NaN distinction. Note: This is currently needed, even if we use
47+
# nullable Float64Dtype in the types_mapper. See:
48+
# https://github.com/pandas-dev/pandas/issues/55668
49+
# Regarding type: ignore, this class has been public at this
50+
# location since pandas 1.2.0. See:
51+
# https://pandas.pydata.org/docs/dev/reference/api/pandas.arrays.FloatingArray.html
52+
pd_array = pandas.arrays.FloatingArray( # type: ignore
53+
column.to_numpy(),
54+
pyarrow.compute.is_null(column).to_numpy(),
55+
)
56+
series = pandas.Series(pd_array, dtype=dtype)
57+
elif dtype == pandas.Int64Dtype():
58+
# Avoid out-of-bounds errors in Pandas 1.5.x, which incorrectly
59+
# casts to float64 in an intermediate step.
60+
pd_array = pandas.arrays.IntegerArray(
61+
pyarrow.compute.fill_null(column, 0).to_numpy(),
62+
pyarrow.compute.is_null(column).to_numpy(),
63+
)
64+
series = pandas.Series(pd_array, dtype=dtype)
65+
elif isinstance(dtype, pandas.ArrowDtype):
66+
# Avoid conversion logic if we are backing the pandas Series by the
67+
# arrow array.
68+
series = pandas.Series(
69+
pandas.arrays.ArrowExtensionArray(column), # type: ignore
70+
dtype=dtype,
71+
)
72+
else:
73+
series = column.to_pandas(types_mapper=lambda _: dtype)
74+
75+
serieses[field.name] = series
76+
77+
return pandas.DataFrame(serieses)

tests/system/small/test_dataframe.py

-10
Original file line numberDiff line numberDiff line change
@@ -2046,16 +2046,6 @@ def test__dir__with_rename(scalars_dfs):
20462046
def test_iloc_slice(scalars_df_index, scalars_pandas_df_index, start, stop, step):
20472047
bf_result = scalars_df_index.iloc[start:stop:step].to_pandas()
20482048
pd_result = scalars_pandas_df_index.iloc[start:stop:step]
2049-
2050-
# Pandas may assign non-object dtype to empty series and series index
2051-
# dtypes of empty columns are a known area of divergence from pandas
2052-
for column in pd_result.columns:
2053-
if (
2054-
pd_result[column].empty and column != "geography_col"
2055-
): # for empty geography_col, bigframes assigns non-object dtype
2056-
pd_result[column] = pd_result[column].astype("object")
2057-
pd_result.index = pd_result.index.astype("object")
2058-
20592049
pd.testing.assert_frame_equal(
20602050
bf_result,
20612051
pd_result,

tests/system/small/test_series.py

+37-11
Original file line numberDiff line numberDiff line change
@@ -575,7 +575,15 @@ def test_series_int_int_operators_series(scalars_dfs, operator):
575575
)
576576
def test_mods(scalars_dfs, col_x, col_y, method):
577577
scalars_df, scalars_pandas_df = scalars_dfs
578-
bf_result = getattr(scalars_df[col_x], method)(scalars_df[col_y]).to_pandas()
578+
x_bf = scalars_df[col_x]
579+
y_bf = scalars_df[col_y]
580+
bf_series = getattr(x_bf, method)(y_bf)
581+
# BigQuery's mod functions return [BIG]NUMERIC values unless both arguments are integers.
582+
# https://cloud.google.com/bigquery/docs/reference/standard-sql/mathematical_functions#mod
583+
if x_bf.dtype == pd.Int64Dtype() and y_bf.dtype == pd.Int64Dtype():
584+
bf_result = bf_series.to_pandas()
585+
else:
586+
bf_result = bf_series.astype("Float64").to_pandas()
579587
pd_result = getattr(scalars_pandas_df[col_x], method)(scalars_pandas_df[col_y])
580588
pd.testing.assert_series_equal(pd_result, bf_result)
581589

@@ -620,8 +628,20 @@ def test_divmods_series(scalars_dfs, col_x, col_y, method):
620628
pd_div_result, pd_mod_result = getattr(scalars_pandas_df[col_x], method)(
621629
scalars_pandas_df[col_y]
622630
)
623-
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
624-
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
631+
# BigQuery's mod functions return NUMERIC values for non-INT64 inputs.
632+
if bf_div_result.dtype == pd.Int64Dtype():
633+
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
634+
else:
635+
pd.testing.assert_series_equal(
636+
pd_div_result, bf_div_result.astype("Float64").to_pandas()
637+
)
638+
639+
if bf_mod_result.dtype == pd.Int64Dtype():
640+
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
641+
else:
642+
pd.testing.assert_series_equal(
643+
pd_mod_result, bf_mod_result.astype("Float64").to_pandas()
644+
)
625645

626646

627647
@pytest.mark.parametrize(
@@ -649,8 +669,20 @@ def test_divmods_scalars(scalars_dfs, col_x, other, method):
649669
scalars_df, scalars_pandas_df = scalars_dfs
650670
bf_div_result, bf_mod_result = getattr(scalars_df[col_x], method)(other)
651671
pd_div_result, pd_mod_result = getattr(scalars_pandas_df[col_x], method)(other)
652-
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
653-
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
672+
# BigQuery's mod functions return NUMERIC values for non-INT64 inputs.
673+
if bf_div_result.dtype == pd.Int64Dtype():
674+
pd.testing.assert_series_equal(pd_div_result, bf_div_result.to_pandas())
675+
else:
676+
pd.testing.assert_series_equal(
677+
pd_div_result, bf_div_result.astype("Float64").to_pandas()
678+
)
679+
680+
if bf_mod_result.dtype == pd.Int64Dtype():
681+
pd.testing.assert_series_equal(pd_mod_result, bf_mod_result.to_pandas())
682+
else:
683+
pd.testing.assert_series_equal(
684+
pd_mod_result, bf_mod_result.astype("Float64").to_pandas()
685+
)
654686

655687

656688
@pytest.mark.parametrize(
@@ -1941,12 +1973,6 @@ def test_iloc_nested(scalars_df_index, scalars_pandas_df_index):
19411973
def test_series_iloc(scalars_df_index, scalars_pandas_df_index, start, stop, step):
19421974
bf_result = scalars_df_index["string_col"].iloc[start:stop:step].to_pandas()
19431975
pd_result = scalars_pandas_df_index["string_col"].iloc[start:stop:step]
1944-
1945-
# Pandas may assign non-object dtype to empty series and series index
1946-
if pd_result.empty:
1947-
pd_result = pd_result.astype("object")
1948-
pd_result.index = pd_result.index.astype("object")
1949-
19501976
pd.testing.assert_series_equal(
19511977
bf_result,
19521978
pd_result,

0 commit comments

Comments
 (0)