Skip to content

Commit e3a056a

Browse files
feat: support upcasting numeric columns in concat (#294)
1 parent 655178a commit e3a056a

File tree

5 files changed

+118
-27
lines changed

5 files changed

+118
-27
lines changed

bigframes/core/blocks.py

+60-23
Original file line numberDiff line numberDiff line change
@@ -1506,8 +1506,10 @@ def concat(
15061506
blocks: typing.List[Block] = [self, *other]
15071507
if ignore_index:
15081508
blocks = [block.reset_index() for block in blocks]
1509-
1510-
result_labels = _align_indices(blocks)
1509+
level_names = None
1510+
else:
1511+
level_names, level_types = _align_indices(blocks)
1512+
blocks = [_cast_index(block, level_types) for block in blocks]
15111513

15121514
index_nlevels = blocks[0].index.nlevels
15131515

@@ -1522,7 +1524,7 @@ def concat(
15221524
result_expr,
15231525
index_columns=list(result_expr.column_ids)[:index_nlevels],
15241526
column_labels=aligned_blocks[0].column_labels,
1525-
index_labels=result_labels,
1527+
index_labels=level_names,
15261528
)
15271529
if ignore_index:
15281530
result_block = result_block.reset_index()
@@ -1783,16 +1785,40 @@ def block_from_local(data) -> Block:
17831785
)
17841786

17851787

1788+
def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]):
1789+
original_block = block
1790+
result_ids = []
1791+
for idx_id, idx_dtype, target_dtype in zip(
1792+
block.index_columns, block.index_dtypes, dtypes
1793+
):
1794+
if idx_dtype != target_dtype:
1795+
block, result_id = block.apply_unary_op(idx_id, ops.AsTypeOp(target_dtype))
1796+
result_ids.append(result_id)
1797+
else:
1798+
result_ids.append(idx_id)
1799+
1800+
expr = block.expr.select_columns((*result_ids, *original_block.value_columns))
1801+
return Block(
1802+
expr,
1803+
index_columns=result_ids,
1804+
column_labels=original_block.column_labels,
1805+
index_labels=original_block.index_labels,
1806+
)
1807+
1808+
17861809
def _align_block_to_schema(
17871810
block: Block, schema: dict[Label, bigframes.dtypes.Dtype]
17881811
) -> Block:
1789-
"""For a given schema, remap block to schema by reordering columns and inserting nulls."""
1812+
"""For a given schema, remap block to schema by reordering columns, and inserting nulls."""
17901813
col_ids: typing.Tuple[str, ...] = ()
17911814
for label, dtype in schema.items():
1792-
# TODO: Support casting to lcd type - requires mixed type support
17931815
matching_ids: typing.Sequence[str] = block.label_to_col_id.get(label, ())
17941816
if len(matching_ids) > 0:
17951817
col_id = matching_ids[-1]
1818+
col_dtype = block.expr.get_column_type(col_id)
1819+
if dtype != col_dtype:
1820+
# If _align_schema worked properly, this should always be an upcast
1821+
block, col_id = block.apply_unary_op(col_id, ops.AsTypeOp(dtype))
17961822
col_ids = (*col_ids, col_id)
17971823
else:
17981824
block, null_column = block.create_constant(None, dtype=dtype)
@@ -1810,38 +1836,44 @@ def _align_schema(
18101836
return functools.reduce(reduction, schemas)
18111837

18121838

1813-
def _align_indices(blocks: typing.Sequence[Block]) -> typing.Sequence[Label]:
1814-
"""Validates that the blocks have compatible indices and returns the resulting label names."""
1839+
def _align_indices(
1840+
blocks: typing.Sequence[Block],
1841+
) -> typing.Tuple[typing.Sequence[Label], typing.Sequence[bigframes.dtypes.Dtype]]:
1842+
"""Validates that the blocks have compatible indices and returns the resulting label names and dtypes."""
18151843
names = blocks[0].index.names
18161844
types = blocks[0].index.dtypes
1845+
18171846
for block in blocks[1:]:
18181847
if len(names) != block.index.nlevels:
18191848
raise NotImplementedError(
18201849
f"Cannot combine indices with different number of levels. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}"
18211850
)
1822-
if block.index.dtypes != types:
1823-
raise NotImplementedError(
1824-
f"Cannot combine different index dtypes. Use 'ignore_index'=True. {constants.FEEDBACK_LINK}"
1825-
)
18261851
names = [
18271852
lname if lname == rname else None
18281853
for lname, rname in zip(names, block.index.names)
18291854
]
1830-
return names
1855+
types = [
1856+
bigframes.dtypes.lcd_type_or_throw(ltype, rtype)
1857+
for ltype, rtype in zip(types, block.index.dtypes)
1858+
]
1859+
types = typing.cast(typing.Sequence[bigframes.dtypes.Dtype], types)
1860+
return names, types
18311861

18321862

18331863
def _combine_schema_inner(
18341864
left: typing.Dict[Label, bigframes.dtypes.Dtype],
18351865
right: typing.Dict[Label, bigframes.dtypes.Dtype],
18361866
) -> typing.Dict[Label, bigframes.dtypes.Dtype]:
18371867
result = dict()
1838-
for label, type in left.items():
1868+
for label, left_type in left.items():
18391869
if label in right:
1840-
if type != right[label]:
1870+
right_type = right[label]
1871+
output_type = bigframes.dtypes.lcd_type(left_type, right_type)
1872+
if output_type is None:
18411873
raise ValueError(
18421874
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
18431875
)
1844-
result[label] = type
1876+
result[label] = output_type
18451877
return result
18461878

18471879

@@ -1850,15 +1882,20 @@ def _combine_schema_outer(
18501882
right: typing.Dict[Label, bigframes.dtypes.Dtype],
18511883
) -> typing.Dict[Label, bigframes.dtypes.Dtype]:
18521884
result = dict()
1853-
for label, type in left.items():
1854-
if (label in right) and (type != right[label]):
1855-
raise ValueError(
1856-
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
1857-
)
1858-
result[label] = type
1859-
for label, type in right.items():
1885+
for label, left_type in left.items():
1886+
if label not in right:
1887+
result[label] = left_type
1888+
else:
1889+
right_type = right[label]
1890+
output_type = bigframes.dtypes.lcd_type(left_type, right_type)
1891+
if output_type is None:
1892+
raise NotImplementedError(
1893+
f"Cannot concat rows with label {label} due to mismatched types. {constants.FEEDBACK_LINK}"
1894+
)
1895+
result[label] = output_type
1896+
for label, right_type in right.items():
18601897
if label not in left:
1861-
result[label] = type
1898+
result[label] = right_type
18621899
return result
18631900

18641901

bigframes/dtypes.py

+19-2
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
pd.Int64Dtype,
4141
pd.StringDtype,
4242
pd.ArrowDtype,
43+
gpd.array.GeometryDtype,
4344
]
4445

4546
# On BQ side, ARRAY, STRUCT, GEOGRAPHY, JSON are not orderable
@@ -139,7 +140,7 @@
139140

140141
ARROW_TO_IBIS = {arrow: ibis for ibis, arrow in IBIS_TO_ARROW.items()}
141142

142-
IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Union[Dtype, np.dtype[Any]]] = {
143+
IBIS_TO_BIGFRAMES: Dict[ibis_dtypes.DataType, Dtype] = {
143144
ibis: pandas for ibis, pandas in BIDIRECTIONAL_MAPPINGS
144145
}
145146
# Allow REQUIRED fields to map correctly.
@@ -179,7 +180,7 @@
179180

180181
def ibis_dtype_to_bigframes_dtype(
181182
ibis_dtype: ibis_dtypes.DataType,
182-
) -> Union[Dtype, np.dtype[Any]]:
183+
) -> Dtype:
183184
"""Converts an Ibis dtype to a BigQuery DataFrames dtype
184185
185186
Args:
@@ -340,6 +341,11 @@ def literal_to_ibis_scalar(
340341
ValueError: if passed literal cannot be coerced to a
341342
BigQuery DataFrames compatible scalar
342343
"""
344+
# Special case: Can create nulls for non-bidirectional types
345+
if (force_dtype == gpd.array.GeometryDtype()) and pd.isna(literal):
346+
# Ibis has bug for casting nulltype to geospatial, so we perform intermediate cast first
347+
geotype = ibis_dtypes.GeoSpatial(geotype="geography", srid=4326, nullable=True)
348+
return ibis.literal(None, geotype)
343349
ibis_dtype = BIGFRAMES_TO_IBIS[force_dtype] if force_dtype else None
344350

345351
if pd.api.types.is_list_like(literal):
@@ -538,6 +544,8 @@ def is_compatible(scalar: typing.Any, dtype: Dtype) -> typing.Optional[Dtype]:
538544

539545

540546
def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
547+
if dtype1 == dtype2:
548+
return dtype1
541549
# Implicit conversion currently only supported for numeric types
542550
hierarchy: list[Dtype] = [
543551
pd.BooleanDtype(),
@@ -550,3 +558,12 @@ def lcd_type(dtype1: Dtype, dtype2: Dtype) -> typing.Optional[Dtype]:
550558
return None
551559
lcd_index = max(hierarchy.index(dtype1), hierarchy.index(dtype2))
552560
return hierarchy[lcd_index]
561+
562+
563+
def lcd_type_or_throw(dtype1: Dtype, dtype2: Dtype) -> Dtype:
564+
result = lcd_type(dtype1, dtype2)
565+
if result is None:
566+
raise NotImplementedError(
567+
f"BigFrames cannot upcast {dtype1} and {dtype2} to common type. {constants.FEEDBACK_LINK}"
568+
)
569+
return result

bigframes/operations/aggregations.py

+5
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,11 @@ def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value:
396396
)
397397

398398

399+
class LastOp(WindowOp):
400+
def _as_ibis(self, column: ibis_types.Column, window=None) -> ibis_types.Value:
401+
return _apply_window_if_present(column.last(), window)
402+
403+
399404
class LastNonNullOp(WindowOp):
400405
@property
401406
def skips_nulls(self):

tests/system/small/test_dataframe.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -3141,9 +3141,9 @@ def test_df___array__(scalars_df_index, scalars_pandas_df_index):
31413141

31423142

31433143
def test_getattr_attribute_error_when_pandas_has(scalars_df_index):
3144-
# asof is implemented in pandas but not in bigframes
3144+
# swapaxes is implemented in pandas but not in bigframes
31453145
with pytest.raises(AttributeError):
3146-
scalars_df_index.asof()
3146+
scalars_df_index.swapaxes()
31473147

31483148

31493149
def test_getattr_attribute_error(scalars_df_index):

tests/system/small/test_pandas.py

+32
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,38 @@ def test_concat_dataframe_mismatched_columns(scalars_dfs, how):
185185
pd.testing.assert_frame_equal(bf_result, pd_result)
186186

187187

188+
def test_concat_dataframe_upcasting(scalars_dfs):
189+
scalars_df, scalars_pandas_df = scalars_dfs
190+
191+
bf_input1 = scalars_df[["int64_col", "float64_col", "int64_too"]].set_index(
192+
"int64_col", drop=True
193+
)
194+
bf_input1.columns = ["a", "b"]
195+
bf_input2 = scalars_df[["int64_too", "int64_col", "float64_col"]].set_index(
196+
"float64_col", drop=True
197+
)
198+
bf_input2.columns = ["a", "b"]
199+
bf_result = bpd.concat([bf_input1, bf_input2], join="outer")
200+
bf_result = bf_result.to_pandas()
201+
202+
bf_input1 = (
203+
scalars_pandas_df[["int64_col", "float64_col", "int64_too"]]
204+
.set_index("int64_col", drop=True)
205+
.set_axis(["a", "b"], axis=1)
206+
)
207+
bf_input2 = (
208+
scalars_pandas_df[["int64_too", "int64_col", "float64_col"]]
209+
.set_index("float64_col", drop=True)
210+
.set_axis(["a", "b"], axis=1)
211+
)
212+
pd_result = pd.concat(
213+
[bf_input1, bf_input2],
214+
join="outer",
215+
)
216+
217+
pd.testing.assert_frame_equal(bf_result, pd_result)
218+
219+
188220
@pytest.mark.parametrize(
189221
("how",),
190222
[

0 commit comments

Comments
 (0)