Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(DO NOT MERGE) Try and put spark setup in setup timings #1739

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions .github/workflows/pytest_run_tests_with_cache.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ on:
pull_request:
branches:
- master
- '**dev'
- "**dev"
paths:
- splink/**
- tests/**
Expand All @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-20.04
strategy:
matrix:
python-version: [ "3.8.16", "3.9.10" ]
python-version: ["3.8.16", "3.9.10"]
name: Run tests with python verion ${{ matrix.python-version }}
steps:
#----------------------------------------------
Expand All @@ -33,8 +33,8 @@ jobs:
- name: Load cached Poetry installation
uses: actions/cache@v2
with:
path: ~/.local # the path depends on the OS
key: poetry-0 # increment to reset cache
path: ~/.local # the path depends on the OS
key: poetry-0 # increment to reset cache
#----------------------------------------------
# ----- install & configure poetry -----
#----------------------------------------------
Expand Down Expand Up @@ -72,5 +72,4 @@ jobs:
- name: Run tests
run: |
source .venv/bin/activate
pytest tests/

pytest -v --durations=0 tests/
19 changes: 7 additions & 12 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,16 @@ def pytest_collection_modifyitems(items, config):
item.add_marker(mark)


def _make_spark():
@pytest.fixture(scope="module")
def spark():
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf()

conf.set("spark.driver.memory", "4g")
conf.set("spark.sql.shuffle.partitions", "8")
conf.set("spark.default.parallelism", "8")
conf.set("spark.driver.memory", "6g")
conf.set("spark.sql.shuffle.partitions", "1")
conf.set("spark.default.parallelism", "1")
# Add custom similarity functions, which are bundled with Splink
# documented here: https://github.com/moj-analytical-services/splink_scalaudfs
path = similarity_jar_location()
Expand All @@ -58,12 +59,6 @@ def _make_spark():
return spark


@pytest.fixture(scope="module")
def spark():
spark = _make_spark()
yield spark


@pytest.fixture(scope="module")
def df_spark(spark):
df = spark.read.csv("./tests/datasets/fake_1000_from_splink_demos.csv", header=True)
Expand All @@ -75,14 +70,14 @@ def df_spark(spark):
# see e.g. https://stackoverflow.com/a/42400786/11811947
# ruff: noqa: F811
@pytest.fixture
def test_helpers(pg_engine):
def test_helpers(spark, pg_engine):
# LazyDict to lazy-load helpers
# That way we do not instantiate helpers we do not need
# e.g. running only duckdb tests we don't need PostgresTestHelper
# so we can run duckdb tests in environments w/o access to postgres
return LazyDict(
duckdb=(DuckDBTestHelper, []),
spark=(SparkTestHelper, [_make_spark]),
spark=(SparkTestHelper, [spark]),
sqlite=(SQLiteTestHelper, []),
postgres=(PostgresTestHelper, [pg_engine]),
)
Expand Down
6 changes: 3 additions & 3 deletions tests/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,15 @@ def brl(self):


class SparkTestHelper(TestHelper):
def __init__(self, spark_creator_function):
self.spark = spark_creator_function()
def __init__(self, spark):
self.spark = spark

@property
def Linker(self):
return SparkLinker

def extra_linker_args(self):
return {"spark": self.spark}
return {"spark": self.spark, "num_partitions_on_repartition": 1}

def convert_frame(self, df):
spark_frame = self.spark.createDataFrame(df)
Expand Down
Loading
Loading