Skip to content

Commit

Permalink
Multiple files ingest
Browse files Browse the repository at this point in the history
  • Loading branch information
Pee Tankulrat authored and Pee Tankulrat committed Dec 1, 2022
1 parent 75c97c2 commit d0ce5b7
Show file tree
Hide file tree
Showing 13 changed files with 120 additions and 32 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ The framework aims to work on a two-fold principle for detecting PII:
* [x] Replacement ('replace'): Replaces a detected sensitive value with a specified surrogate value. Leave the value empty to simply delete detected sensitive value.
* [x] Hash ('hash'): Hash detected sensitive value with sha256.

Currently supported file formats: `csv, parquet`

### TO-DO
Following features are part of the backlog with more features coming soon
Expand All @@ -45,6 +46,8 @@ You can have a detailed at upcoming features and backlog in this [Github Board](
2. Setup hooks and install packages with `make install`

### Config JSON
Limitation: when reading multiple files, all files that matches the file_path must have same headers. Additionally, when file format is not given anonymizer will assume that the file format is the first matched filename. Thus, when the file_path ends with `/*` and the folder contains mixed file format, the operation will fail.

An example for the config JSON is located at `<PROJECT_ROOT>/pii-anonymizer.json`
```
{
Expand Down
28 changes: 28 additions & 0 deletions pii_anonymizer/common/file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import glob

allowed_format = ["csv", "parquet", "parq"]


def format_glob(input_path):
filename = input_path.split("/")[-1]
if "*" in filename:
return input_path
has_extension = True if len(filename.split(".")) > 1 else False
if has_extension != True:
if input_path[-1] != "/":
input_path = input_path + "/"
input_path = input_path + "*"

return input_path


def get_format(input_path):
file_format = None
all_files = glob.glob(format_glob(input_path))
for filename in all_files:
format = filename.split(".")[-1]
if format in allowed_format:
file_format = format if format != "parq" else "parquet"
break

return file_format
42 changes: 42 additions & 0 deletions pii_anonymizer/common/tests/test_file_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from unittest import TestCase

from pii_anonymizer.common.file_utils import get_format, format_glob


class TestFileUtils(TestCase):
def setUp(self):
self.get_format = get_format
self.format_glob = format_glob

def test_should_format_input_path_to_glob(self):
test_cases = [
{"path": "./test_data", "expect": "./test_data/*"},
{"path": "./test_data/", "expect": "./test_data/*"},
{"path": "./test_data/*", "expect": "./test_data/*"},
{"path": "./test_data/*.csv", "expect": "./test_data/*.csv"},
{
"path": "./test_data/test_data.csv",
"expect": "./test_data/test_data.csv",
},
]
for test_case in test_cases:
with self.subTest():
expect = test_case["expect"]
actual = self.format_glob(test_case["path"])
self.assertEqual(actual, expect)

def test_should_return_file_format(self):
test_cases = [
{"path": "./test_data/*", "expect": "csv"},
{"path": "./test_data/*.csv", "expect": "csv"},
{"path": "./test_data/test_data.csv", "expect": "csv"},
{"path": "./test_data/test_data.parquet", "expect": "parquet"},
{"path": "./test_data/*.parquet", "expect": "parquet"},
{"path": "./test_data/*.parq", "expect": "parquet"},
]

for test_case in test_cases:
with self.subTest():
expect = test_case["expect"]
actual = self.get_format(test_case["path"])
self.assertEqual(actual, expect)
4 changes: 2 additions & 2 deletions pii_anonymizer/spark/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from pyspark.sql import SparkSession
from pii_anonymizer.spark.report.report_generator import ReportGenerator
from pii_anonymizer.spark.acquire.csv_parser import CsvParser
from pii_anonymizer.spark.acquire.input_parser import InputParser
from pii_anonymizer.spark.analyze.detectors.pii_detector import PIIDetector
from pii_anonymizer.spark.constants import ACQUIRE, REPORT
from pii_anonymizer.spark.write.output_writer import OutputWriter
Expand All @@ -27,7 +27,7 @@ def run(self):
spark = (
SparkSession.builder.master("local").appName("PIIDetector").getOrCreate()
)
parsed_data_frame = CsvParser(spark, config=self.config[ACQUIRE]).parse()
parsed_data_frame = InputParser(spark, config=self.config[ACQUIRE]).parse()
pii_analysis_report, redacted_data_frame = PIIDetector(
self.config
).analyze_data_frame(parsed_data_frame)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from pyspark.sql import SparkSession
from pii_anonymizer.common.file_utils import format_glob, get_format
from pii_anonymizer.spark.constants import FILE_PATH
import pyspark.sql.functions as f


class CsvParser:
class InputParser:
def __init__(self, spark: SparkSession, config):
self.__validate_config(config)
self.input_path = config["file_path"]
self.input_path = format_glob(config[FILE_PATH])
self.delimiter = (
config["delimiter"]
if "delimiter" in config and config["delimiter"]
Expand All @@ -19,9 +19,10 @@ def __validate_config(self, config):
raise ValueError("Config 'file_path' needs to be provided for parsing")

def parse(self):
format = get_format(self.input_path)
df = self.spark.read.load(
self.input_path,
format="csv",
format=format,
sep=self.delimiter,
header="true",
inferSchema="true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@
from unittest import TestCase
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pii_anonymizer.spark.acquire.csv_parser import CsvParser
from pii_anonymizer.spark.acquire.input_parser import InputParser


class TestCsvParser(TestCase):
class TestInputParser(TestCase):
def setUp(self) -> None:
self.SPARK = (
SparkSession.builder.master("local").appName("Test CSVParser").getOrCreate()
SparkSession.builder.master("local")
.appName("Test InputParser")
.getOrCreate()
)
self.current_dir = os.path.dirname(os.path.realpath(__file__))

def test_invalid_config_gets_caught_during_initialization(self):
context = {}
with self.assertRaises(ValueError) as ve:
CsvParser(self.SPARK, config=context)
InputParser(self.SPARK, config=context)
self.assertEqual(
str(ve.exception), "Config 'file_path' needs to be provided for parsing"
)
Expand All @@ -27,7 +29,7 @@ def test_if_valid_csv_file_provided_returns_spark_df(self):
expected = self.SPARK.createDataFrame(
[("Lisa Beard", "557-39-2479")], ["name", "ssn"]
)
actual = CsvParser(spark=self.SPARK, config=config).parse()
actual = InputParser(spark=self.SPARK, config=config).parse()

self.assertEqual(actual.schema, expected.schema)
self.assertEqual(actual.collect(), expected.collect())
Expand All @@ -39,7 +41,7 @@ def test_if_valid_csv_file_with_different_delimiter_provided_returns_spark_df(se
expected = self.SPARK.createDataFrame(
[("Lisa Beard", "557-39-2479")], ["name", "ssn"]
)
actual = CsvParser(spark=self.SPARK, config=config).parse()
actual = InputParser(spark=self.SPARK, config=config).parse()

self.assertEqual(actual.schema, expected.schema)
self.assertEqual(actual.collect(), expected.collect())
Expand All @@ -48,6 +50,6 @@ def test_if_empty_csv_file_returns_empty_pandas_df(self):
file_path = "test_data/empty.csv".format(self.current_dir)
config = {"file_path": file_path}
expected = self.SPARK.createDataFrame([], StructType([]))
actual = CsvParser(spark=self.SPARK, config=config).parse()
actual = InputParser(spark=self.SPARK, config=config).parse()
self.assertEqual(actual.schema, expected.schema)
self.assertEqual(actual.collect(), expected.collect())
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import pandas as pd
import dask.dataframe as dd
from pii_anonymizer.common.constants import FILE_PATH, DELIMITER
from pii_anonymizer.common.file_utils import format_glob, get_format

allowed_format = ["csv", "parquet", "parq"]

class CsvParser:

class InputParser:
def __init__(self, config):
self.__validate_config(config)
self.input_path = config[FILE_PATH]
self.input_path = format_glob(config[FILE_PATH])

self.delimiter = (
config[DELIMITER] if DELIMITER in config and config[DELIMITER] else ","
)
Expand All @@ -17,7 +21,15 @@ def __validate_config(self, config):

def parse(self):
try:
df = dd.read_csv(self.input_path, delimiter=self.delimiter).compute()
match get_format(self.input_path):
case "csv":
df = dd.read_csv(
self.input_path, delimiter=self.delimiter
).compute()
case "parquet":
df = dd.read_parquet(
self.input_path, delimiter=self.delimiter
).compute()
except pd.errors.EmptyDataError:
return pd.DataFrame({})

Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
from unittest import TestCase
import os
import pandas as pd
from pii_anonymizer.standalone.acquire.csv_parser import CsvParser
from pii_anonymizer.standalone.acquire.input_parser import InputParser


class TestCsvParser(TestCase):
class TestInputParser(TestCase):
def setUp(self):
self.current_dir = os.path.dirname(os.path.realpath(__file__))

def test_invalid_config_gets_caught_during_initialization(self):
context = {}
with self.assertRaises(ValueError) as ve:
CsvParser(config=context)
InputParser(config=context)
self.assertEqual(
str(ve.exception), "Config 'file_path' needs to be provided for parsing"
)

def test_if_valid_csv_file_provided_returns_pandas_df(self):
file_path = "test_data/comma_delimited_file.csv".format(self.current_dir)
config = {"file_path": file_path, "delimiter": ""}
test_csv_parser_valid_file_path = CsvParser(config=config)
test_csv_parser_valid_file_path = InputParser(config=config)
expected = pd.DataFrame({"name": ["Lisa Beard"], "ssn": ["557-39-2479"]})
actual = test_csv_parser_valid_file_path.parse()
self.assertEqual(actual.to_dict(), expected.to_dict())
Expand All @@ -29,15 +29,15 @@ def test_if_valid_csv_file_with_different_delimiter_provided_returns_pandas_df(
):
file_path = "test_data/pipe_delimited_file.csv".format(self.current_dir)
config = {"file_path": file_path, "delimiter": "|"}
test_csv_parser_valid_file_path = CsvParser(config=config)
test_csv_parser_valid_file_path = InputParser(config=config)
expected = pd.DataFrame({"name": ["Lisa Beard"], "ssn": ["557-39-2479"]})
actual = test_csv_parser_valid_file_path.parse()
self.assertEqual(actual.to_dict(), expected.to_dict())

def test_if_empty_csv_file_returns_empty_pandas_df(self):
file_path = "test_data/empty.csv".format(self.current_dir)
config = {"file_path": file_path}
test_csv_parser_valid_file_path = CsvParser(config=config)
test_csv_parser_valid_file_path = InputParser(config=config)
expected = pd.DataFrame({})
actual = test_csv_parser_valid_file_path.parse()
self.assertEqual(actual.to_dict(), expected.to_dict())
Expand All @@ -46,5 +46,5 @@ def test_if_error_is_raised_if_df_has_null_values(self):
file_path = "test_data/missing_comma.csv".format(self.current_dir)
config = {"file_path": file_path}
with self.assertRaises(ValueError) as ve:
CsvParser(config=config).parse()
InputParser(config=config).parse()
self.assertEqual(str(ve.exception), "Dataframe contains NULL values")
4 changes: 2 additions & 2 deletions pii_anonymizer/standalone/dpf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import json

from pii_anonymizer.standalone.report.report_generator import ReportGenerator
from pii_anonymizer.standalone.acquire.csv_parser import CsvParser
from pii_anonymizer.standalone.acquire.input_parser import InputParser
from pii_anonymizer.standalone.analyze.detectors.pii_detector import PIIDetector
from pii_anonymizer.common.constants import ACQUIRE, REPORT
from pii_anonymizer.standalone.write.output_writer import OutputWriter
Expand All @@ -21,7 +21,7 @@ def __init__(self, config_file_path):
# TODO : validate the config for the stages right here
def run(self):
validate(self.config)
parsed_data_frame = CsvParser(config=self.config[ACQUIRE]).parse()
parsed_data_frame = InputParser(config=self.config[ACQUIRE]).parse()
pii_analysis_report, anonymized_data_frame = PIIDetector(
self.config
).analyze_data_frame(parsed_data_frame)
Expand Down
8 changes: 4 additions & 4 deletions pii_anonymizer/standalone/tests/test_dpf_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ def setUp(self):
@patch(
"pii_anonymizer.standalone.analyze.detectors.pii_detector.PIIDetector.analyze_data_frame"
)
@patch("pii_anonymizer.standalone.acquire.csv_parser.CsvParser.parse")
@patch("pii_anonymizer.standalone.acquire.csv_parser.CsvParser.__init__")
@patch("pii_anonymizer.standalone.acquire.input_parser.InputParser.parse")
@patch("pii_anonymizer.standalone.acquire.input_parser.InputParser.__init__")
def test_run_parses_the_config_file_and_invokes_respective_stages_correctly(
self,
mock_csv_parser_init,
Expand Down Expand Up @@ -66,8 +66,8 @@ def test_run_parses_the_config_file_and_invokes_respective_stages_correctly(
@patch(
"pii_anonymizer.standalone.analyze.detectors.pii_detector.PIIDetector.analyze_data_frame"
)
@patch("pii_anonymizer.standalone.acquire.csv_parser.CsvParser.parse")
@patch("pii_anonymizer.standalone.acquire.csv_parser.CsvParser.__init__")
@patch("pii_anonymizer.standalone.acquire.input_parser.InputParser.parse")
@patch("pii_anonymizer.standalone.acquire.input_parser.InputParser.__init__")
def test_run_short_circuits_generate_report_when_no_PII_values_detected(
self,
mock_csv_parser_init,
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file added test_data/test_data.parq
Binary file not shown.
Binary file added test_data/test_data.parquet
Binary file not shown.

1 comment on commit d0ce5b7

@pee-tw
Copy link
Collaborator

@pee-tw pee-tw commented on d0ce5b7 Dec 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.