Skip to content

Commit

Permalink
Mixer validator (#215)
Browse files Browse the repository at this point in the history
* Adding script that validates if mixer config is well formatted and has everything in place

* Add S3 path validation with boto3 existence check

* Adding check of the files, trying to run jq expressions on them and see if both files and jq expressions are valid

* Add S3 path validation, sampling, and doc-attribute alignment checks

* adding logic to split jsonpath expressions into pieces and check them

* Added JsonPath syntax evaluation, started working on sampling docs and checking their content

* Adding logic to check if all doc and corresponding attributes files contain correct fields and same anount of lines

* Adding functionality to check if filters in config and attribute files match

* updating filter checking logic to focus on filters missing from the mixer config

* adding logic to run jq and jsonpath filters on small set of docs to see if they work or fail

* refactored to use smart open and added logic to download sample files to a temp folder

* added logic to sample lines from doc and apply filters to it, refactored main, added logic to download sample files and work with them locally

* Adding clean up logic to delete sample files after the run

* adding test configs for mixer validator

* addressing comments, spliting script into smaller files, moving test configs to test folder, adding a couple of helpers functions

* adding --verbose method, support of .env variables

* supporting != operator

* updating types in function definitions, updating Readme

* adding more error handlers

* deleting the initial version of the script

---------

Co-authored-by: Masha Iureva <[email protected]>
  • Loading branch information
mariia-iureva and Masha Iureva authored Oct 21, 2024
1 parent 153777e commit 0c0f10c
Show file tree
Hide file tree
Showing 15 changed files with 1,347 additions and 0 deletions.
51 changes: 51 additions & 0 deletions configs/test/test_config_jq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
streams:
- name: cc_tiny_subset
documents:
- s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz

attributes:
- cc_tiny_subset_analysis_october
- bff_duplicate_paragraph_spans_new

output:
path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output-exclude-bff
max_size_in_bytes: 4294967296
discard_fields:
- attributes

filter:
syntax: jq
include: []
exclude:
# Language filter (using both cld2 and fasttext)
- (.attributes.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end)
- (.attributes.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 0.8 else false end)
# Document length filter
- (.attributes.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 100 else false end)
# NSFW content filter
- (.attributes.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end)
# Gopher quality filter
- (.attributes.cc_tiny_subset_analysis_october__gopher_v2__word_count | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] < 50 else false end)
- (.attributes.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.1 else false end)
# Deduplication filter (BFF)
- (.attributes.bff_duplicate_paragraph_spans_new | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] >= 1.0 else false end)
# C4 quality filter
- (.attributes.cc_tiny_subset_analysis_october__c4_v2__line_bullets | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.5 else false end)
- (.attributes.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat | if type == "array" and length > 0 and .[0] != null and .[0][2] != null then .[0][2] > 0.3 else false end)

span_replacement:
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS"
min_score: 0.5
replacement: " |||EMAIL_ADDRESS||| "
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER"
min_score: 0.5
replacement: " |||PHONE_NUMBER||| "
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS"
min_score: 0.5
replacement: " |||IP_ADDRESS||| "

work_dir:
input: "/tmp/cc_tiny_subset_mix/input"
output: "/tmp/cc_tiny_subset_mix/output"

processes: 16
50 changes: 50 additions & 0 deletions configs/test/test_config_jsonpath.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
streams:
- name: cc_tiny_subset
documents:
- s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/documents/**/*.jsonl.gz

attributes:
- cc_tiny_subset_analysis_october
- bff_duplicate_paragraph_spans_new

output:
path: s3://ai2-oe-data/mashai/tiny-DCLM-pool-subset-october/mixed-output
max_size_in_bytes: 4294967296
discard_fields:
- attributes

filter:
include: []
exclude:
# Language filter (using both cld2 and fasttext)
- "$.attributes[?(@.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__cld2_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]"
- "$.attributes[?(@.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0] && @.cc_tiny_subset_analysis_october__ft_lang_id_en_paragraph_with_doc_score_v2__doc_en[0][2] < 0.8)]"
# Document length filter
- "$.attributes[?(@.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0] && @.cc_tiny_subset_analysis_october__char_length_with_paragraphs_v1__document[0][2] < 100)]"
# Deduplication filter
- "[email protected][?(@.bff_duplicate_paragraph_spans_new && @.bff_duplicate_paragraph_spans_new[0] && @.bff_duplicate_paragraph_spans_new[0][2] >= 1.0)]"
# NSFW content filter
- "$.attributes[?(@.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0] && @.cc_tiny_subset_analysis_october__jigsaw_nsfw_document_v1____label__nsfw[0][2] > 0.5)]"
# Gopher quality filter (example, adjust threshold as needed)
- "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__word_count && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0] && @.cc_tiny_subset_analysis_october__gopher_v2__word_count[0][2] < 50)]"
- "$.attributes[?(@.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0] && @.cc_tiny_subset_analysis_october__gopher_v2__symbol_to_word_ratio[0][2] > 0.1)]"
# C4 quality filter (example, adjust threshold as needed)
- "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__line_bullets && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0] && @.cc_tiny_subset_analysis_october__c4_v2__line_bullets[0][2] > 0.5)]"
- "$.attributes[?(@.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0] && @.cc_tiny_subset_analysis_october__c4_v2__phrase_repeat[0][2] > 0.3)]"

span_replacement:
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__EMAIL_ADDRESS"
min_score: 0.5
replacement: " |||EMAIL_ADDRESS||| "
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__PHONE_NUMBER"
min_score: 0.5
replacement: " |||PHONE_NUMBER||| "
- span: "$.attributes.cc_tiny_subset_analysis_october__pii_regex_with_counts_fast_v2__IP_ADDRESS"
min_score: 0.5
replacement: " |||IP_ADDRESS||| "

work_dir:
input: "/tmp/cc_tiny_subset_mix/input"
output: "/tmp/cc_tiny_subset_mix/output"

processes: 16
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ dependencies = [
# "fasttext==0.9.2", # broken with new version of setuptools; using fasttext-wheel instead
"fasttext-wheel==0.9.2",
"fsspec>=2023.6.0",
"jsonpath-ng",
"jq",
"msgspec>=0.14.2",
"nltk>=3.9.1",
"omegaconf>=2.3.0",
# "pycld2==0.41",
# "pycld3==0.22", # does not install correctly
"platformdirs>=4.2.0",
"pyyaml",
"python-dotenv>=0.19.0",
"requests",
"rich",
"s3fs>=2023.6.0",
Expand Down
44 changes: 44 additions & 0 deletions scripts/validate_mixer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Dolma Mixer Configuration Validator

This script validates the configuration for the Dolma Mixer, ensuring that all necessary components are correctly set up before running the main process.

## Features

The validator performs the following checks:

1. Verifies the presence and format of required fields in the configuration.
2. Validates the syntax of the configuration file (YAML or JSON).
3. Checks for duplicate keys in the configuration.
4. Validates JQ or JSONPath expressions for syntax and compilation.
5. Verifies S3 path syntax and accessibility.
6. Confirms write permissions for output paths.
7. Checks the existence and accessibility of attribute files.
8. Samples a subset of files for detailed validation.
9. Ensures alignment between document and attribute files.
10. Validates the format and content of sampled files.
11. Executes JQ or JSONPath commands on sampled files.
12. Validates nested key existence in filter expressions.

## Usage

Run the validator using the following command:

```
python scripts/validate_mixer/main.py <path_to_config_file> [--num_samples <number>] [--verbose]
```

- `<path_to_config_file>`: Path to your Dolma Mixer configuration file (required)
- `--num_samples <number>`: (Optional) Number of file samples to validate (default: 1)
- `--verbose`: (Optional) Enable verbose output

## Output

The script provides detailed progress information and error messages for any validation failures, helping you troubleshoot configuration issues before running the main Dolma Mixer process.

## Keyboard Interrupt

The script handles keyboard interrupts (Ctrl+C) gracefully.

## Exit Status

The script will exit with a non-zero status if any validation step fails.
7 changes: 7 additions & 0 deletions scripts/validate_mixer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from .config_handler import load_config, validate_config_structure, validate_stream, validate_output, validate_filter_config
from .validator import load_and_validate_config, validate_s3_paths_and_permissions, validate_stream_filters, validate_documents_and_attributes
from .file_operations import sample_files, download_file, sample_and_download_files, count_file_lines, check_attribute_name_typos, sample_file_lines, sample_documents_with_attributes, validate_jsonl, validate_filters_and_check_typos, sample_and_extract_attributes
from .filter_operations import validate_jq_expression, validate_jsonpath_expression, validate_filter_expressions, evaluate_comparison, evaluate_jsonpath_condition, split_complex_jsonpath, prepare_filter, execute_filter_commands, extract_attribute_names_from_filters, extract_filter_attributes
from .s3_utils import validate_s3_path, check_s3_path_exists, check_s3_path_writable, check_s3_parent_exists, list_s3_objects, get_base_path, get_corresponding_attribute_path
from .utils import keyboard_interrupt_handler
from .env_handler import load_env_variables, expand_env_vars
112 changes: 112 additions & 0 deletions scripts/validate_mixer/config_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import yaml
import json
import os
from typing import Dict, Any, List, Union, Type
from env_handler import expand_env_vars_in_config

def load_config(config_path: str) -> Dict[str, Any]:
"""Load the configuration file (YAML or JSON)."""
if not os.path.exists(config_path):
raise FileNotFoundError(f"Config file not found at path: {config_path}")
try:
with open(config_path, 'r') as file:
if config_path.endswith('.yaml') or config_path.endswith('.yml'):
config = yaml.safe_load(file)
elif config_path.endswith('.json'):
config = json.load(file)
else:
raise ValueError("Unsupported file format. Use .yaml, .yml, or .json")

config = expand_env_vars_in_config(config)
return config
except Exception as e:
raise ValueError(f"Error loading config file: {str(e)}")

def validate_config_structure(config: Dict[str, Any]) -> List[str]:
"""Validate the basic structure of the configuration."""
required_fields = ['streams', 'processes']
errors = []

for field in required_fields:
if field not in config:
errors.append(f"Missing required field: {field}")
elif field == 'streams':
errors.extend(validate_streams(config[field]))
elif field == 'processes':
errors.extend(validate_processes(config[field]))

return errors

def validate_streams(streams: Any) -> List[str]:
errors = []
if not isinstance(streams, list):
errors.append("'streams' should be a list")
else:
for i, stream in enumerate(streams):
stream_errors = validate_stream(stream, i)
errors.extend(stream_errors)
return errors

def validate_processes(processes: Any) -> List[str]:
if not isinstance(processes, int):
return ["'processes' should be an integer"]
return []

def validate_stream(stream: Dict[str, Any], index: int) -> List[str]:
"""Validate an individual stream configuration."""
required_fields = ['name', 'documents', 'attributes', 'output']
expected_type = {
'name': str,
'documents': list,
'attributes': list,
'output': dict
}
errors = []

for field in required_fields:
errors.extend(validate_field(stream, field, expected_type[field], index))

if 'output' in stream:
output_errors = validate_output(stream['output'], index)
errors.extend(output_errors)

if 'filter' in stream:
filter_errors = validate_filter_config(stream['filter'], index)
errors.extend(filter_errors)
return errors

def validate_field(stream: Dict[str, Any], field: str, expected_type: Union[Type, List[Type]], stream_index: int) -> List[str]:
"""Check if a field is present in the stream and has the expected type."""
errors = []
if field not in stream:
errors.append(f"Stream {stream_index}: Missing required field: {field}")
elif not isinstance(stream[field], expected_type):
type_name = expected_type.__name__ if isinstance(expected_type, type) else str(expected_type)
errors.append(f"Stream {stream_index}: '{field}' should be a {type_name}")
return errors

def validate_output(output: Dict[str, Any], stream_index: int) -> List[str]:
"""Validate the output configuration of a stream."""
required_fields = ['path', 'max_size_in_bytes']
errors = []

for field in required_fields:
if field not in output:
errors.append(f"Stream {stream_index} output: Missing required field: {field}")

if 'max_size_in_bytes' in output and not isinstance(output['max_size_in_bytes'], int):
errors.append(f"Stream {stream_index} output: 'max_size_in_bytes' should be an integer")

return errors

def validate_filter_config(filter_config: Dict[str, Any], stream_index: int) -> List[str]:
"""Validate the filter configuration of a stream."""
errors = []

if 'include' in filter_config and not isinstance(filter_config['include'], list):
errors.append(f"Stream {stream_index} filter: 'include' should be a list")

if 'exclude' in filter_config and not isinstance(filter_config['exclude'], list):
errors.append(f"Stream {stream_index} filter: 'exclude' should be a list")

return errors
34 changes: 34 additions & 0 deletions scripts/validate_mixer/env_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# env_handler.py
import os
import re
from dotenv import load_dotenv
from utils import vprint

def load_env_variables():
load_dotenv()

def expand_custom_env_vars(value):
"""Expand environment variables with ${oc.env:VAR_NAME} syntax."""
pattern = r'\${oc\.env:([^}]+)}'

def replace_env_var(match):
env_var_name = match.group(1)
env_var_value = os.getenv(env_var_name)
if env_var_value is None:
print(f"Warning: Environment variable {env_var_name} not found")
return match.group(0) # Return the original string if env var not found
return env_var_value

return re.sub(pattern, replace_env_var, value)

def expand_env_vars_in_config(config):
"""Expand environment variables in 'documents' and 'output' sections of the config."""
if 'streams' in config:
for stream in config['streams']:
if 'documents' in stream:
stream['documents'] = [expand_custom_env_vars(doc) for doc in stream['documents']]
vprint(f"Expanded documents: {stream['documents']}") # Debug print
if 'output' in stream and 'path' in stream['output']:
stream['output']['path'] = expand_custom_env_vars(stream['output']['path'])
vprint(f"Expanded output path: {stream['output']['path']}") # Debug print
return config
Loading

0 comments on commit 0c0f10c

Please sign in to comment.