Skip to content

Commit 43d0864

Browse files
refactor: complete dtype rules for expression tree transformations (#376)
1 parent 2b9a01d commit 43d0864

24 files changed

+794
-178
lines changed

bigframes/core/__init__.py

+47-12
Original file line numberDiff line numberDiff line change
@@ -14,21 +14,26 @@
1414
from __future__ import annotations
1515

1616
from dataclasses import dataclass
17+
import functools
1718
import io
1819
import typing
1920
from typing import Iterable, Sequence
2021

2122
import ibis.expr.types as ibis_types
2223
import pandas
24+
import pyarrow as pa
25+
import pyarrow.feather as pa_feather
2326

2427
import bigframes.core.compile as compiling
2528
import bigframes.core.expression as ex
2629
import bigframes.core.guid
2730
import bigframes.core.join_def as join_def
31+
import bigframes.core.local_data as local_data
2832
import bigframes.core.nodes as nodes
2933
from bigframes.core.ordering import OrderingColumnReference
3034
import bigframes.core.ordering as orderings
3135
import bigframes.core.rewrite
36+
import bigframes.core.schema as schemata
3237
import bigframes.core.utils
3338
from bigframes.core.window_spec import WindowSpec
3439
import bigframes.dtypes
@@ -63,28 +68,32 @@ def from_ibis(
6368
node = nodes.ReadGbqNode(
6469
table=table,
6570
table_session=session,
66-
columns=tuple(columns),
71+
columns=tuple(
72+
bigframes.dtypes.ibis_value_to_canonical_type(column)
73+
for column in columns
74+
),
6775
hidden_ordering_columns=tuple(hidden_ordering_columns),
6876
ordering=ordering,
6977
)
7078
return cls(node)
7179

7280
@classmethod
73-
def from_pandas(cls, pd_df: pandas.DataFrame, session: bigframes.Session):
81+
def from_pyarrow(cls, arrow_table: pa.Table, session: Session):
82+
adapted_table = local_data.adapt_pa_table(arrow_table)
83+
schema = local_data.arrow_schema_to_bigframes(adapted_table.schema)
84+
7485
iobytes = io.BytesIO()
75-
# Use alphanumeric identifiers, to avoid downstream problems with escaping.
76-
as_ids = [
77-
bigframes.core.utils.label_to_identifier(label, strict=True)
78-
for label in pd_df.columns
79-
]
80-
unique_ids = tuple(bigframes.core.utils.disambiguate_ids(as_ids))
81-
pd_df.reset_index(drop=True).set_axis(unique_ids, axis=1).to_feather(iobytes)
82-
node = nodes.ReadLocalNode(feather_bytes=iobytes.getvalue(), session=session)
86+
pa_feather.write_feather(adapted_table, iobytes)
87+
node = nodes.ReadLocalNode(
88+
iobytes.getvalue(),
89+
data_schema=schema,
90+
session=session,
91+
)
8392
return cls(node)
8493

8594
@property
8695
def column_ids(self) -> typing.Sequence[str]:
87-
return self._compile_ordered().column_ids
96+
return self.schema.names
8897

8998
@property
9099
def session(self) -> Session:
@@ -95,6 +104,32 @@ def session(self) -> Session:
95104
required_session if (required_session is not None) else get_global_session()
96105
)
97106

107+
@functools.cached_property
108+
def schema(self) -> schemata.ArraySchema:
109+
# TODO: switch to use self.node.schema
110+
return self._compiled_schema
111+
112+
@functools.cached_property
113+
def _compiled_schema(self) -> schemata.ArraySchema:
114+
compiled = self._compile_unordered()
115+
items = tuple(
116+
schemata.SchemaItem(id, compiled.get_column_type(id))
117+
for id in compiled.column_ids
118+
)
119+
return schemata.ArraySchema(items)
120+
121+
def validate_schema(self):
122+
tree_derived = self.node.schema
123+
ibis_derived = self._compiled_schema
124+
if tree_derived.names != ibis_derived.names:
125+
raise ValueError(
126+
f"Unexpected names internal {tree_derived.names} vs compiled {ibis_derived.names}"
127+
)
128+
if tree_derived.dtypes != ibis_derived.dtypes:
129+
raise ValueError(
130+
f"Unexpected types internal {tree_derived.dtypes} vs compiled {ibis_derived.dtypes}"
131+
)
132+
98133
def _try_evaluate_local(self):
99134
"""Use only for unit testing paths - not fully featured. Will throw exception if fails."""
100135
import ibis
@@ -104,7 +139,7 @@ def _try_evaluate_local(self):
104139
)
105140

106141
def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
107-
return self._compile_ordered().get_column_type(key)
142+
return self.schema.get_type(key)
108143

109144
def _compile_ordered(self) -> compiling.OrderedIR:
110145
return compiling.compile_ordered_ir(self.node)

bigframes/core/blocks.py

+16-24
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
3030
import warnings
3131

32-
import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
3332
import google.cloud.bigquery as bigquery
3433
import pandas as pd
34+
import pyarrow as pa
3535

3636
import bigframes._config.sampling_options as sampling_options
3737
import bigframes.constants as constants
@@ -141,32 +141,23 @@ def __init__(
141141
self._stats_cache[" ".join(self.index_columns)] = {}
142142

143143
@classmethod
144-
def from_local(cls, data, session: bigframes.Session) -> Block:
145-
pd_data = pd.DataFrame(data)
146-
columns = pd_data.columns
147-
148-
# Make a flattened version to treat as a table.
149-
if len(pd_data.columns.names) > 1:
150-
pd_data.columns = columns.to_flat_index()
151-
144+
def from_local(cls, data: pd.DataFrame, session: bigframes.Session) -> Block:
145+
# Assumes caller has already converted datatypes to bigframes ones.
146+
pd_data = data
147+
column_labels = pd_data.columns
152148
index_labels = list(pd_data.index.names)
153-
# The ArrayValue layer doesn't know about indexes, so make sure indexes
154-
# are real columns with unique IDs.
155-
pd_data = pd_data.reset_index(
156-
names=[f"level_{level}" for level in range(len(index_labels))]
157-
)
158-
pd_data = pd_data.set_axis(
159-
vendored_pandas_io_common.dedup_names(
160-
list(pd_data.columns), is_potential_multiindex=False
161-
),
162-
axis="columns",
163-
)
164-
index_ids = pd_data.columns[: len(index_labels)]
165149

166-
keys_expr = core.ArrayValue.from_pandas(pd_data, session)
150+
# unique internal ids
151+
column_ids = [f"column_{i}" for i in range(len(pd_data.columns))]
152+
index_ids = [f"level_{level}" for level in range(pd_data.index.nlevels)]
153+
154+
pd_data = pd_data.set_axis(column_ids, axis=1)
155+
pd_data = pd_data.reset_index(names=index_ids)
156+
as_pyarrow = pa.Table.from_pandas(pd_data, preserve_index=False)
157+
array_value = core.ArrayValue.from_pyarrow(as_pyarrow, session=session)
167158
return cls(
168-
keys_expr,
169-
column_labels=columns,
159+
array_value,
160+
column_labels=column_labels,
170161
index_columns=index_ids,
171162
index_labels=index_labels,
172163
)
@@ -484,6 +475,7 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):
484475
# general Sequence[Label] that BigQuery DataFrames has.
485476
# See: https://github.com/pandas-dev/pandas-stubs/issues/804
486477
df.index.names = self.index.names # type: ignore
478+
df.columns = self.column_labels
487479

488480
def _materialize_local(
489481
self, materialize_options: MaterializationOptions = MaterializationOptions()

bigframes/core/compile/compiled.py

+16-41
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
IntegerEncoding,
3737
OrderingColumnReference,
3838
)
39+
import bigframes.core.schema as schemata
3940
import bigframes.core.utils as utils
4041
from bigframes.core.window_spec import WindowSpec
4142
import bigframes.dtypes
@@ -627,56 +628,30 @@ def __init__(
627628
def from_pandas(
628629
cls,
629630
pd_df: pandas.DataFrame,
631+
schema: schemata.ArraySchema,
630632
) -> OrderedIR:
631633
"""
632634
Builds an in-memory only (SQL only) expr from a pandas dataframe.
635+
636+
Assumed that the dataframe has unique string column names and bigframes-suppported dtypes.
633637
"""
634-
# We can't include any hidden columns in the ArrayValue constructor, so
635-
# grab the column names before we add the hidden ordering column.
636-
column_names = [str(column) for column in pd_df.columns]
637-
# Make sure column names are all strings.
638-
pd_df = pd_df.set_axis(column_names, axis="columns")
639-
pd_df = pd_df.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})
640638

641639
# ibis memtable cannot handle NA, must convert to None
642-
pd_df = pd_df.astype("object") # type: ignore
643-
pd_df = pd_df.where(pandas.notnull(pd_df), None)
644-
645-
# NULL type isn't valid in BigQuery, so retry with an explicit schema in these cases.
646-
keys_memtable = ibis.memtable(pd_df)
647-
schema = keys_memtable.schema()
648-
new_schema = []
649-
for column_index, column in enumerate(schema):
650-
if column == ORDER_ID_COLUMN:
651-
new_type: ibis_dtypes.DataType = ibis_dtypes.int64
652-
else:
653-
column_type = schema[column]
654-
# The autodetected type might not be one we can support, such
655-
# as NULL type for empty rows, so convert to a type we do
656-
# support.
657-
new_type = bigframes.dtypes.bigframes_dtype_to_ibis_dtype(
658-
bigframes.dtypes.ibis_dtype_to_bigframes_dtype(column_type)
659-
)
660-
# TODO(swast): Ibis memtable doesn't use backticks in struct
661-
# field names, so spaces and other characters aren't allowed in
662-
# the memtable context. Blocked by
663-
# https://github.com/ibis-project/ibis/issues/7187
664-
column = f"col_{column_index}"
665-
new_schema.append((column, new_type))
666-
667-
# must set non-null column labels. these are not the user-facing labels
668-
pd_df = pd_df.set_axis(
669-
[column for column, _ in new_schema],
670-
axis="columns",
671-
)
672-
keys_memtable = ibis.memtable(pd_df, schema=ibis.schema(new_schema))
640+
# this destroys the schema however
641+
ibis_values = pd_df.astype("object").where(pandas.notnull(pd_df), None) # type: ignore
642+
ibis_values = ibis_values.assign(**{ORDER_ID_COLUMN: range(len(pd_df))})
643+
# derive the ibis schema from the original pandas schema
644+
ibis_schema = [
645+
(name, bigframes.dtypes.bigframes_dtype_to_ibis_dtype(dtype))
646+
for name, dtype in zip(schema.names, schema.dtypes)
647+
]
648+
ibis_schema.append((ORDER_ID_COLUMN, ibis_dtypes.int64))
649+
650+
keys_memtable = ibis.memtable(ibis_values, schema=ibis.schema(ibis_schema))
673651

674652
return cls(
675653
keys_memtable,
676-
columns=[
677-
keys_memtable[f"col_{column_index}"].name(column)
678-
for column_index, column in enumerate(column_names)
679-
],
654+
columns=[keys_memtable[column].name(column) for column in pd_df.columns],
680655
ordering=ExpressionOrdering(
681656
ordering_value_columns=tuple(
682657
[OrderingColumnReference(ORDER_ID_COLUMN)]

bigframes/core/compile/compiler.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@ def compile_peak_sql(node: nodes.BigFrameNode, n_rows: int) -> typing.Optional[s
4141
return compile_unordered_ir(node).peek_sql(n_rows)
4242

4343

44-
@functools.cache
44+
# TODO: Remove cache when schema no longer requires compilation to derive schema (and therefor only compiles for execution)
45+
@functools.lru_cache(maxsize=5000)
4546
def compile_node(
4647
node: nodes.BigFrameNode, ordered: bool = True
4748
) -> compiled.UnorderedIR | compiled.OrderedIR:
@@ -80,7 +81,7 @@ def compile_join(node: nodes.JoinNode, ordered: bool = True):
8081
@_compile_node.register
8182
def compile_readlocal(node: nodes.ReadLocalNode, ordered: bool = True):
8283
array_as_pd = pd.read_feather(io.BytesIO(node.feather_bytes))
83-
ordered_ir = compiled.OrderedIR.from_pandas(array_as_pd)
84+
ordered_ir = compiled.OrderedIR.from_pandas(array_as_pd, node.schema)
8485
if ordered:
8586
return ordered_ir
8687
else:

0 commit comments

Comments
 (0)