Skip to content

Commit

Permalink
[Commoncrawl pipeline] Add load from commoncrawl component (#269)
Browse files Browse the repository at this point in the history
This component is the first part of the commoncrawl pipeline. Given an
index, this component loads the corresponding index file from the AWS
Public Data Sets (S3 bucket) and returns a list of its WARC segment file
paths as a dataframe.
  • Loading branch information
shayorshay authored Jul 5, 2023
1 parent cbd69e9 commit ab1e6de
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM --platform=linux/amd64 python:3.8-slim

## System dependencies
RUN apt-get update && \
apt-get upgrade -y && \
apt-get install git -y

# install requirements
COPY requirements.txt /
RUN pip3 install --no-cache-dir -r requirements.txt

# Set the working directory to the component folder
WORKDIR /component/src

# Copy over src-files
COPY src/ .

ENTRYPOINT ["python", "main.py"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
name: Load index file from commoncrawl
description: Component that loads a given index file from commoncrawl
image: ghcr.io/ml6team/load_from_commoncrawl:latest

produces:
segment:
fields:
path:
type: string

args:
index_name:
description: Name of index file on commoncrawl
type: str
n_segments_to_load:
description: Number of segments to load from the commoncrawl index file
type: int
default: None
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
boto3==1.26.158
git+https://github.com/ml6team/fondant@main
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""This component loads a dataset from CommonCrawl based on a given index."""
import logging
import typing as t

import io
import boto3
import gzip

import dask.dataframe as dd
import pandas as pd

from fondant.component import LoadComponent

logger = logging.getLogger(__name__)

S3_COMMONCRAWL_BUCKET = "commoncrawl"


def fetch_warc_file_from_s3(s3_bucket: str, s3_key: str) -> bytes:
"""Fetches a WARC file from S3 and returns its content as a Dask DataFrame.
Args:
s3_bucket: The name of the S3 bucket.
s3_key: The key of the S3 object to be downloaded.
Returns:
File object containing the WARC file content.
"""
logger.info(f"Fetching WARC file from S3: {s3_bucket}/{s3_key}...")

s3 = boto3.client("s3")
file_obj = io.BytesIO()
s3.download_fileobj(s3_bucket, s3_key, file_obj)
file_obj.seek(0)

return file_obj


def read_warc_paths_file(
warc_file: bytes, n_segments_to_load: t.Optional[int] = None
) -> dd.DataFrame:
"""Reads a WARC file containing a list of segment file paths and returns a Dask DataFrame.
Args:
warc_file: The WARC file to read.
n_segments_to_load: The number of segments to load from the WARC file.
Returns:
A Dask DataFrame containing the segment file paths.
"""
logger.info(f"Reading WARC file...")
warc_paths = []
with gzip.open(warc_file, mode="rt") as f:
warc_paths = [line.strip() for line in f]

df = pd.DataFrame(warc_paths, columns=["warc_paths"])
dask_df = dd.from_pandas(df, npartitions=1)
dask_df = dask_df.rename(columns={"warc_paths": "segment_path"})

if n_segments_to_load:
dask_df = dask_df.head(n_segments_to_load)
dask_df = dd.from_pandas(dask_df, npartitions=1)

return dask_df


class LoadFromCommonCrawl(LoadComponent):
def load(
self, index_name: str, n_segments_to_load: t.Optional[int] = None
) -> dd.DataFrame:
"""Loads a dataset of segment file paths from CommonCrawl based on a given index.
Args:
index_name: The name of the CommonCrawl index to load.
n_segments_to_load: The number of segments to load from the index.
Returns:
A Dask DataFrame containing the segment file paths.
"""
logger.info(f"Loading CommonCrawl index {index_name}...")
warc_paths_file_key = f"crawl-data/{index_name}/warc.paths.gz"
warc_paths_file_content = fetch_warc_file_from_s3(
S3_COMMONCRAWL_BUCKET, warc_paths_file_key
)

warc_paths_df = read_warc_paths_file(
warc_paths_file_content, n_segments_to_load
)

return warc_paths_df


if __name__ == "__main__":
component = LoadFromCommonCrawl.from_args()
component.run()

0 comments on commit ab1e6de

Please sign in to comment.