Skip to content

WIP: add pd.read_ipc and DataFrame.to_ipc to provide efficient serialization to/from memory #15907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions pandas/io/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
""" ipc format compat """

from pandas.types.generic import ABCIndexClass, ABCSeries, ABCDataFrame
from pandas.compat import string_types, cPickle
from pandas._libs.lib import is_string_array, is_unicode_array
from pandas.types.common import is_object_dtype


def _try_import():
# since pandas
# we need to import on first use

try:
import pyarrow
except ImportError:

# give a nice error message
raise ImportError("the pyarrow is not installed\n"
"you can install via conda\n"
"conda install pyarrow -c conda-forge")

return pyarrow


def to_ipc(obj, engine='infer'):
"""
Write a DataFrame to the ipc format

Parameters
----------
obj : Index, Series, DataFrame
engine : string, optional
string to indicate the engine {'infer', 'pickle', 'pyarrow'}
'infer' will pick an engine based upon performance considerations

Returns
-------
dict-of-metadata and bytes

"""
if engine == 'pickle':
return _to_pickle(obj)
elif engine == 'pyarrow':
try:
return _to_pyarrow(obj)
except: # pragma
pass

if isinstance(obj, (ABCIndexClass, ABCSeries)):
return _to_pickle(obj)
elif isinstance(obj, ABCDataFrame):

# decide quickly if we can serialize using
# pyarrow or pickle

# smallish, just pickle
if len(obj) <= 100000:
return _to_pickle(obj)

# check our object columns
for c, col in obj.iteritems():
if not is_object_dtype(col):
continue

# if we discover we have actual python objects
# embedded with strings/unicode, then pickle
values = col.values
if isinstance(values[0], string_types):
if not is_string_array(values):
return _to_pickle(obj)
else:
if not is_unicode_array(values):
return _to_pickle(obj)

return _to_pyarrow(obj)

raise ValueError("ipc only supports IO with Index,"
"Series, DataFrames, a {} was "
"passed".format(type(obj)))


def _to_pyarrow(df):
""" helper routine to return via pyarrow """
pyarrow = _try_import()
d = pyarrow.write_ipc(df)
d['engine'] = 'pyarrow'
return d


def _to_pickle(obj):
""" helper routine to return a pickle of an object """
d = {'engine': 'pickle', 'data': cPickle.dumps(obj)}
return d


def read_ipc(db):
"""
Load a pyarrow ipc format object from the file dict-of-bytes

.. versionadded 0.20.0

Parameters
----------
dict-of-meta-and-bytes : a dictionary of meta data & bytes

Returns
-------
Pandas Object

"""
engine = db['engine']

if engine == 'pickle':
return _read_pickle(db['data'])
try:
return _read_pyarrow(db['data'])
except: # pragma
return _read_pickle(db['data'])


def _read_pyarrow(db):
""" helper to return via pyarrow """
pyarrow = _try_import()
return pyarrow.read_ipc(db)


def _read_pickle(db):
""" helper to return via pickle """
return cPickle.loads(db)
146 changes: 146 additions & 0 deletions pandas/tests/io/test_ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
""" test ipc compat """

import pytest
pyarrow = pytest.importorskip('pyarrow')

from distutils.version import LooseVersion
import numpy as np
import pandas as pd
from pandas import Series, Index, DataFrame
from pandas.io.ipc import (to_ipc, read_ipc,
_to_pickle, _to_pyarrow,
_read_pickle, _read_pyarrow)

import pandas.util.testing as tm

_HAVE_LATEST_PYARROW = LooseVersion(pyarrow.__version__) > '0.2.0'


@pytest.fixture(
params=[('pickle', _to_pickle, _read_pickle),
pytest.mark.skipif(not _HAVE_LATEST_PYARROW,
reason='need newer pyarrow version')(
'pyarrow', _to_pyarrow, _read_pyarrow)],
ids=lambda x: x[0])
def engine(request):
return request.param


@pytest.fixture
def pa():
if not _HAVE_LATEST_PYARROW:
pytest.skip("need newer pyarrow")


def make_mixed_frame(N):
return DataFrame(
{'A': np.arange(N),
'B': np.random.randn(N),
'C': 'foo',
'D': tm.makeStringIndex(N),
'E': pd.Categorical.from_codes(np.repeat([0, 1], N // 2),
categories=['foo', 'bar']),
'F': pd.date_range('20130101', freq='s', periods=N)})


class TestIPC(object):

def check_error_on_write(self, df, exc):
# check that we are raising the exception
# on writing

with pytest.raises(exc):
to_ipc(df)

def check_round_trip(self, df, engine=None):

if engine is None:
writer = to_ipc
reader = read_ipc
b = writer(df)
else:
_, writer, reader = engine
b = writer(df)

# we are calling a lower-level routine
b = b['data']

result = reader(b)
tm.assert_frame_equal(result, df)

def test_error(self):
for obj in [1, 'foo', pd.Timestamp('20130101'),
np.array([1, 2, 3])]:
self.check_error_on_write(obj, ValueError)

def test_with_small_size(self, engine):

N = 100
df = make_mixed_frame(N)
self.check_round_trip(df, engine)

def test_with_med_size(self, engine):

# large size
N = 10000
df = make_mixed_frame(N)
self.check_round_trip(df, engine)

def test_with_large_size(self, engine):

# large size
N = 1000000
df = make_mixed_frame(N)
self.check_round_trip(df, engine)

def test_non_dataframe(self):

i = Index(['foo', 'bar'])
b = to_ipc(i)
result = read_ipc(b)
tm.assert_index_equal(result, i)

s = Series(['foo', 'bar'])
b = to_ipc(s)
result = read_ipc(b)
tm.assert_series_equal(result, s)

def test_basic(self, pa):

df = pd.DataFrame({
'string': list('abc'),
'int': list(range(1, 4)),
'uint': np.arange(3, 6).astype('u1'),
'float': np.arange(4.0, 7.0, dtype='float64'),
'bool': [True, False, True],
'bool_with_nan': [True, None, True],
'cat': pd.Categorical(list('abc')),
'date_range': pd.date_range('20130101', periods=3),
'date_range_tz': pd.date_range('20130101', periods=3,
tz='US/Eastern'),
'timedelta': pd.timedelta_range('1 day', periods=3)})

# should work both on pickle & pyarrow
# TODO: how to assure this?
self.check_round_trip(df)

def test_pickle_only(self):

# period
df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
self.check_round_trip(df)

# non-strings
df = pd.DataFrame({'a': ['a', 1, 2.0]})
self.check_round_trip(df)

def test_duplicate_columns(self, pa):

df = pd.DataFrame(np.arange(12).reshape(4, 3),
columns=list('aaa')).copy()
self.check_round_trip(df)

def test_stringify_columns(self, pa):

df = pd.DataFrame(np.arange(12).reshape(4, 3)).copy()
self.check_round_trip(df)