-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support read_csv for backends with no native support #9908
base: main
Are you sure you want to change the base?
Changes from 3 commits
8b87686
fedd4de
38f91dd
773cfb5
c7aea6e
6547ae3
69b4e39
6152533
0214160
7bb6f96
520fe5f
e023025
f1b42f8
902bb47
79885a9
ce2c8cf
9acda5c
e62925b
96ff701
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,6 +3,7 @@ | |
import abc | ||
import collections.abc | ||
import functools | ||
import glob | ||
import importlib.metadata | ||
import keyword | ||
import re | ||
|
@@ -1236,6 +1237,103 @@ def has_operation(cls, operation: type[ops.Value]) -> bool: | |
f"{cls.name} backend has not implemented `has_operation` API" | ||
) | ||
|
||
def read_csv( | ||
self, path: str | Path, table_name: str | None = None, **kwargs: Any | ||
) -> ir.Table: | ||
"""Register a CSV file as a table in the current backend. | ||
|
||
Parameters | ||
---------- | ||
path | ||
The data source. A string or Path to the CSV file. | ||
table_name | ||
An optional name to use for the created table. This defaults to | ||
a sequentially generated name. | ||
**kwargs | ||
Additional keyword arguments passed to the backend loading function. | ||
|
||
Returns | ||
------- | ||
ir.Table | ||
The just-registered table | ||
|
||
Examples | ||
-------- | ||
Connect to a SQLite database: | ||
|
||
>>> con = ibis.sqlite.connect() | ||
|
||
Read a single csv file: | ||
|
||
>>> table = con.read_csv("path/to/file.csv") | ||
|
||
Read all csv files in a directory: | ||
|
||
>>> table = con.read_parquet("path/to/csv_directory/*") | ||
|
||
Read all csv files with a glob pattern: | ||
|
||
>>> table = con.read_csv("path/to/csv_directory/test_*.csv") | ||
|
||
Read csv file from s3: | ||
|
||
>>> table = con.read_csv("s3://bucket/path/to/file.csv") | ||
|
||
""" | ||
pa = self._import_pyarrow() | ||
import pyarrow.csv as pcsv | ||
from pyarrow import fs | ||
|
||
read_options_args = {} | ||
parse_options_args = {} | ||
convert_options_args = {} | ||
memory_pool = None | ||
|
||
for key, value in kwargs.items(): | ||
if hasattr(pcsv.ReadOptions, key): | ||
read_options_args[key] = value | ||
elif hasattr(pcsv.ParseOptions, key): | ||
parse_options_args[key] = value | ||
elif hasattr(pcsv.ConvertOptions, key): | ||
convert_options_args[key] = value | ||
elif key == "memory_pool": | ||
memory_pool = value | ||
else: | ||
raise ValueError(f"Invalid args: {key!r}") | ||
jitingxu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
read_options = pcsv.ReadOptions(**read_options_args) | ||
parse_options = pcsv.ParseOptions(**parse_options_args) | ||
convert_options = pcsv.ConvertOptions(**convert_options_args) | ||
if memory_pool: | ||
memory_pool = pa.default_memory_pool() | ||
jitingxu1 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
path = str(path) | ||
file_system, path = fs.FileSystem.from_uri(path) | ||
|
||
if isinstance(file_system, fs.LocalFileSystem): | ||
paths = glob.glob(path) | ||
if not paths: | ||
raise FileNotFoundError(f"No files found at {path!r}") | ||
else: | ||
paths = [path] | ||
|
||
pyarrow_tables = [] | ||
for path in paths: | ||
with file_system.open_input_file(path) as f: | ||
pyarrow_table = pcsv.read_csv( | ||
f, | ||
read_options=read_options, | ||
parse_options=parse_options, | ||
convert_options=convert_options, | ||
memory_pool=memory_pool, | ||
) | ||
pyarrow_tables.append(pyarrow_table) | ||
|
||
pyarrow_table = pa.concat_tables(pyarrow_tables) | ||
table_name = table_name or util.gen_name("read_csv") | ||
self.create_table(table_name, pyarrow_table) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I think this should probably be a temp table or a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
return self.table(table_name) | ||
|
||
def _cached(self, expr: ir.Table): | ||
"""Cache the provided expression. | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.