Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support SparkIntegration activation after SparkContext created #3411

Merged
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 74 additions & 47 deletions sentry_sdk/integrations/spark/spark_driver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import sentry_sdk
from sentry_sdk.integrations import Integration
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
from sentry_sdk.scope import Scope

from sentry_sdk._types import TYPE_CHECKING

Expand All @@ -9,6 +10,7 @@
from typing import Optional

from sentry_sdk._types import Event, Hint
from pyspark import SparkContext


class SparkIntegration(Integration):
Expand All @@ -17,7 +19,7 @@ class SparkIntegration(Integration):
@staticmethod
def setup_once():
# type: () -> None
patch_spark_context_init()
_setup_sentry_tracing()


def _set_app_properties():
Expand All @@ -37,7 +39,7 @@ def _set_app_properties():


def _start_sentry_listener(sc):
# type: (Any) -> None
# type: (SparkContext) -> None
"""
Start java gateway server to add custom `SparkListener`
"""
Expand All @@ -49,7 +51,51 @@ def _start_sentry_listener(sc):
sc._jsc.sc().addSparkListener(listener)


def patch_spark_context_init():
def _add_event_processor(sc):
# type: (SparkContext) -> None
scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if sc._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", sc.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", sc._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
sc._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
event["tags"].setdefault("spark_version", sc.version)
event["tags"].setdefault("app_name", sc.appName)
event["tags"].setdefault("application_id", sc.applicationId)
event["tags"].setdefault("master", sc.master)
event["tags"].setdefault("spark_home", sc.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)

return event


def _activate_integration(sc):
# type: (SparkContext) -> None

_start_sentry_listener(sc)
_set_app_properties()
_add_event_processor(sc)


def _patch_spark_context_init():
# type: () -> None
from pyspark import SparkContext

Expand All @@ -59,51 +105,22 @@ def patch_spark_context_init():
def _sentry_patched_spark_context_init(self, *args, **kwargs):
# type: (SparkContext, *Any, **Any) -> Optional[Any]
rv = spark_context_init(self, *args, **kwargs)
_start_sentry_listener(self)
_set_app_properties()

scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if self._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", self.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", self._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
self._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault(
"driver.host", self._conf.get("spark.driver.host")
)
event["tags"].setdefault(
"driver.port", self._conf.get("spark.driver.port")
)
event["tags"].setdefault("spark_version", self.version)
event["tags"].setdefault("app_name", self.appName)
event["tags"].setdefault("application_id", self.applicationId)
event["tags"].setdefault("master", self.master)
event["tags"].setdefault("spark_home", self.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)

return event
Comment on lines -65 to -100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated this part into a distinct function.


_activate_integration(self)
return rv

SparkContext._do_init = _sentry_patched_spark_context_init


def _setup_sentry_tracing():
# type: () -> None
from pyspark import SparkContext

if SparkContext._active_spark_context is not None:
_activate_integration(SparkContext._active_spark_context)
return
_patch_spark_context_init()
Comment on lines +117 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the Spark context already exists, _activate_integration is called instead of applying the patch.



class SparkListener:
def onApplicationEnd(self, applicationEnd): # noqa: N802,N803
# type: (Any) -> None
Expand Down Expand Up @@ -208,10 +225,20 @@ class Java:


class SentryListener(SparkListener):
def _add_breadcrumb(
self,
level, # type: str
message, # type: str
data=None, # type: Optional[dict[str, Any]]
):
# type: (...) -> None
Scope.set_isolation_scope(Scope.get_global_scope())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be setting the isolation scope to the global scope.

So I can suggest a better alternative, what are you trying to accomplish here?

Copy link
Contributor Author

@seyoon-lim seyoon-lim Aug 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szokeasaurusrex
First, thank you for the review!

  • Summary: When sentry_init is called after SparkContext has been created, the breadcrumbs are not transmitted (if sentry_init is called before SparkContext is created, it works fine). To resolve this issue, I set the isolation_scope to global_scope, and as a result, confirmed that the breadcrumbs are being properly transmitted.

  • Issue: If sentry_init is invoked after SparkContext has been created, the breadcrumbs in the thread handling error raising contain no data.

  • Suspected Reason: When add_breadcrumb is called within the SparkListener, it seems to store the breadcrumb in a separate scope that is not the same scope handling exceptions.
    (id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs) is different)

  • Verification: Inserted print statements at the relevant points of the code.


print insert

  1. https://github.com/getsentry/sentry-python/pull/3411/files#diff-699df798069bd7b15a3b2aef651e75c3abb50829f0de557741e60938d84886d4R228-R236
class SentryListener(SparkListener):
    def _add_breadcrumb(
        self,
        level,  # type: str
        message,  # type: str
        data=None,  # type: Optional[dict[str, Any]]
    ):
        # type: (...) -> None
        # Scope.set_isolation_scope(Scope.get_global_scope())
        print(f"* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: {os.getpid()}, current thread: {threading.get_ident()}")
        print(f"** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
        print(f"*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
        sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
  1. https://github.com/apache/spark/blob/v3.5.2/python/pyspark/errors/exceptions/captured.py#L176-L189
def capture_sql_exception(f: Callable[..., Any]) -> Callable[..., Any]:
    def deco(*a: Any, **kw: Any) -> Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            import sentry_sdk
            import os
            import threading
            print(f"- pyspark/errors/exceptions/captuted.py current pid: {os.getpid()}, current thread: {threading.get_ident()}")
            print(f"-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: {sentry_sdk.Scope.get_isolation_scope()._breadcrumbs}")
            print(f"--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): {id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs)}")
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
                raise converted from None
            else:
                raise

    return deco

test

  • When sentry_init is called after SparkContext has been created
    code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading


if __name__ == "__main__":
    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="",
    )
    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
    spark.read.csv("/path/deos/not/exist/error/raise")

output

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:54:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
====== main() pid: 19639, current thread: 4306142592
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
[1, 4, 9, 16, 25]
====== main() pid: 19639, current thread: 4306142592
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 6232780800
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19639, current thread: 6232780800
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 414304, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 26, 435786, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 54, 29, 173172, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4378114848
- pyspark/errors/exceptions/captuted.py current pid: 19639, current thread: 4306142592
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4372017600
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_after_create_spark_session.py", line 28, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

Process finished with exit code 1
  • When sentry_init is called before SparkContext has been created
    code
from pyspark.sql import SparkSession
import sentry_sdk
from sentry_sdk.integrations.spark import SparkIntegration
import os
import threading


if __name__ == "__main__":
    sentry_sdk.init(
        integrations=[SparkIntegration()],
        dsn="",
    )
    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")

    spark = SparkSession.builder \
        .appName("Simple Example") \
        .master("local[*]") \
        .getOrCreate()

    data = [1, 2, 3, 4, 5]
    rdd = spark.sparkContext.parallelize(data)
    result_rdd = rdd.map(lambda x: x * x)

    result = result_rdd.collect()
    print(result)

    print(f"====== main() pid: {os.getpid()}, current thread: {threading.get_ident()}")
    spark.read.csv("/path/deos/not/exist/error/raise")

output

====== main() pid: 19741, current thread: 4370892160
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/08/27 23:55:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
[1, 4, 9, 16, 25]
====== main() pid: 19741, current thread: 4370892160
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 6166802432
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
* sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() current pid: 19741, current thread: 6166802432
** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}])
*** sentry_sdk/integrations/spark/spark_drvier.py SentryListner._add_breadcrumb() id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
- pyspark/errors/exceptions/captuted.py current pid: 19741, current thread: 4370892160
-- pyspark/errors/exceptions/captuted.py sentry_sdk.Scope.get_isolation_scope()._breadcrumbs: deque([{'level': 'info', 'message': 'Job 0 Started', 'data': None, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 744012, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Submitted', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 35, 765673, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Stage 0 Completed', 'data': {'attemptId': 0, 'name': 'collect at /Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py:24'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 718804, tzinfo=datetime.timezone.utc), 'type': 'default'}, {'level': 'info', 'message': 'Job 0 Ended', 'data': {'result': 'JobSucceeded'}, 'timestamp': datetime.datetime(2024, 8, 27, 14, 55, 38, 721299, tzinfo=datetime.timezone.utc), 'type': 'default'}])
--- pyspark/errors/exceptions/captuted.py id(sentry_sdk.Scope.get_isolation_scope()._breadcrumbs): 4394201536
Traceback (most recent call last):
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/local_test_before_create_spark_session.py", line 28, in <module>
    spark.read.csv("/path/deos/not/exist/error/raise")
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 727, in csv
    return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/kakao/Desktop/shaun/opensource/sentry-python-test/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 181, in deco
    raise converted from None
pyspark.errors.exceptions.captured.AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/path/deos/not/exist/error/raise.

Process finished with exit code 1

If you have any questions, please feel free to let me know!

Thank you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. We need to find a different solution here though, because we cannot set the global scope to the isolation scope. Doing so will likely mess up isolation elsewhere, and cause data unrelated to other events to be sent along with them.

Maybe we need to fork the isolation or current scope somewhere in the Spark integration? I can also try to take a look at this later if you are struggling to figure out how to avoid setting the global scope to the isolation scope.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see.

I will look into this further and work on fixing the issue.

I will update you after conducting some more tests.

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
antonpirker marked this conversation as resolved.
Show resolved Hide resolved

def onJobStart(self, jobStart): # noqa: N802,N803
# type: (Any) -> None
message = "Job {} Started".format(jobStart.jobId())
sentry_sdk.add_breadcrumb(level="info", message=message)
self._add_breadcrumb(level="info", message=message)
_set_app_properties()

def onJobEnd(self, jobEnd): # noqa: N802,N803
Expand All @@ -227,14 +254,14 @@ def onJobEnd(self, jobEnd): # noqa: N802,N803
level = "warning"
message = "Job {} Failed".format(jobEnd.jobId())

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)

def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
# type: (Any) -> None
stage_info = stageSubmitted.stageInfo()
message = "Stage {} Submitted".format(stage_info.stageId())
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
sentry_sdk.add_breadcrumb(level="info", message=message, data=data)
self._add_breadcrumb(level="info", message=message, data=data)
_set_app_properties()

def onStageCompleted(self, stageCompleted): # noqa: N802,N803
Expand All @@ -255,4 +282,4 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
message = "Stage {} Completed".format(stage_info.stageId())
level = "info"

sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
self._add_breadcrumb(level=level, message=message, data=data)
49 changes: 41 additions & 8 deletions tests/integrations/spark/test_spark.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import sys
from unittest.mock import patch

from sentry_sdk.integrations.spark.spark_driver import (
_set_app_properties,
_start_sentry_listener,
Expand All @@ -18,8 +19,22 @@
################


def test_set_app_properties():
spark_context = SparkContext(appName="Testing123")
@pytest.fixture(scope="function")
def sentry_init_with_reset(sentry_init):
from sentry_sdk.integrations import _processed_integrations

yield lambda: sentry_init(integrations=[SparkIntegration()])
_processed_integrations.remove("spark")


@pytest.fixture(scope="function")
def create_spark_context():
yield lambda: SparkContext(appName="Testing123")
SparkContext._active_spark_context.stop()


def test_set_app_properties(create_spark_context):
spark_context = create_spark_context()
_set_app_properties()

assert spark_context.getLocalProperty("sentry_app_name") == "Testing123"
Expand All @@ -30,9 +45,8 @@ def test_set_app_properties():
)


def test_start_sentry_listener():
spark_context = SparkContext.getOrCreate()

def test_start_sentry_listener(create_spark_context):
spark_context = create_spark_context()
gateway = spark_context._gateway
assert gateway._callback_server is None

Expand All @@ -41,9 +55,28 @@ def test_start_sentry_listener():
assert gateway._callback_server is not None


def test_initialize_spark_integration(sentry_init):
sentry_init(integrations=[SparkIntegration()])
SparkContext.getOrCreate()
@patch("sentry_sdk.integrations.spark.spark_driver._patch_spark_context_init")
def test_initialize_spark_integration_before_spark_context_init(
mock_patch_spark_context_init,
sentry_init_with_reset,
create_spark_context,
):
sentry_init_with_reset()
create_spark_context()

mock_patch_spark_context_init.assert_called_once()


@patch("sentry_sdk.integrations.spark.spark_driver._activate_integration")
def test_initialize_spark_integration_after_spark_context_init(
mock_activate_integration,
create_spark_context,
sentry_init_with_reset,
):
create_spark_context()
sentry_init_with_reset()

mock_activate_integration.assert_called_once()


@pytest.fixture
Expand Down
Loading