Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added connector folder and HF file #313

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Empty file added connectors/__init__.py
Empty file.
94 changes: 94 additions & 0 deletions connectors/huggingface_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datasets import load_dataset, get_dataset_split_names
from nomic import AtlasDataset
import numpy as np
import pandas as pd
import pyarrow as pa


# Gets data from HF dataset
def get_hfdata(dataset_identifier):
try:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add docstring to function definition

# Loads dataset without specifying config
dataset = load_dataset(dataset_identifier)
except ValueError as e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to handle this error, seems like handling config through the error message is too risky

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you can take out the try/except here

# Grabs available configs and loads dataset using it
configs = get_dataset_split_names(dataset_identifier)
config = configs[0]
dataset = load_dataset(dataset_identifier, config, trust_remote_code=True, streaming=True, split=config + "[:100000]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would, instead of this, just make the split (and max length) optional arguments to this function (and in turn, to the top level hf_atlasdataset as well) instead of silently checking for splits and grabbing the first one which may not be the one that a user intends - for example, on wikimedia/wikipedia which is split by language, that would be Abkhazian wikipedia, since ab is alphabetically first

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also note that streaming=True makes load_dataset return an IterableDataset which has slightly different behaviors than a normal Dataset - should probably always using streaming if you're going to use it - otherwise you'd need to make sure to test both cases.

Also the row limit should probably be an optional argument as well rather than hardcoded - someone may want to upload a >100k row dataset or less - while using split='splitname[:1000] does work its also not the only way - the .take on a dataset will return a new dataset with only that many rows (and on IterableDatasets will do the right thing and only fetch that many): dataset = dataset.take(1000) - this is probably more sensible for exposing the limit as an argument.

another issue is that slicing this or using .take will get the beginning of the dataset - often times if you are wanting to map a sample of a dataset (because you want to quickly get a picture of what's in it without spending the time/compute to map the whole thing) you want a random sample, for example the wikipedia dataset is also in alphabetical order by title so articles near the beginning will just be ones with titles starting with A which - probably won't get a very representative map of the whole dataset.

Datasets also have a .shuffle method which works similarlu to .take and should be applied before .take. E.g. to get 1000 random rows from a dataset you want dataset = dataset.shuffle().take(1000) - it probably makes sense to use this any time a limit is specified, but its not needed if the whole dataset will be uploaded.



# Processes dataset entries using Arrow
id_counter = 0
data = []
for split in dataset.keys():
for example in dataset[split]:
# Adds a sequential ID
example['id'] = str(id_counter)
id_counter += 1
data.append(example)


# Convert the data list to an Arrow table
table = pa.Table.from_pandas(pd.DataFrame(data))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could probably do pa.Table.from_pylist(data) instead and avoid roundtripping through pandas here



return table


# Converts booleans, lists etc to strings
def convert_to_string(value):
if isinstance(value, bool):
return str(value)
elif isinstance(value, list):
return ' '.join(map(convert_to_string, value))
elif isinstance(value, np.ndarray):
return ' '.join(map(str, value.flatten()))
elif hasattr(value, 'tolist'):
return ' '.join(map(str, value.tolist()))
else:
return str(value)


# Processes Arrow table and converts necessary fields to strings
def process_table(table):
# Converts columns with complex types to strings
for col in table.schema.names:
column = table[col].to_pandas()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pyarrow.compute.cast may be able to handle some of this without having to go through pandas/pure python
https://arrow.apache.org/docs/python/generated/pyarrow.compute.cast.html

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My code seems to throw an Attribute error without it. There are some cases that it works but with some like this one it doesn't.
https://huggingface.co/datasets/Anthropic/hh-rlhf

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can leave this as is for now then

if column.dtype == np.bool_ or column.dtype == object or isinstance(column[0], (list, np.ndarray)):
column = column.apply(convert_to_string)
table = table.set_column(table.schema.get_field_index(col), col, pa.array(column))


return table


# Creates AtlasDataset from HF dataset
def hf_atlasdataset(dataset_identifier):
table = get_hfdata(dataset_identifier.strip())


map_name = dataset_identifier.replace('/', '_')
if not table:
raise ValueError("No data was found for the provided dataset.")


dataset = AtlasDataset(
map_name,
unique_id_field="id",
)


# Ensures all values are converted to strings
processed_table = process_table(table)


# Adds data to the AtlasDataset
dataset.add_data(data=processed_table.to_pandas().to_dict(orient='records'))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add_data accepts arrow tables directly, no need for this conversion (add_data will have to convert it back into an arrow table before uploading if you do this)



return dataset





24 changes: 24 additions & 0 deletions examples/HF_example_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@

from huggingface_connector import hf_atlasdataset
import logging

if __name__ == "__main__":
dataset_identifier = input("Enter Hugging Face dataset identifier: ").strip()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about instead of making this an interactive script, use argparse: https://docs.python.org/3.10/library/argparse.html?highlight=argparse#module-argparse

that way it's easier to handle optional args like split and limit



try:
atlas_dataset = hf_atlasdataset(dataset_identifier)
logging.info(f"AtlasDataset has been created for '{dataset_identifier}'")
except ValueError as e:
logging.error(f"Error creating AtlasDataset: {e}")