diff --git a/benchmarks/appsec_iast_aspects/config.yaml b/benchmarks/appsec_iast_aspects/config.yaml index a2e6faf0dfc..c8eca9b8951 100644 --- a/benchmarks/appsec_iast_aspects/config.yaml +++ b/benchmarks/appsec_iast_aspects/config.yaml @@ -233,6 +233,42 @@ aspect_iast_do_index_on_dict: warmups: 1 iast_enabled: 1 +aspect_no_iast_do_io_bytesio_read: &aspect_no_iast_do_io_bytesio_read + iast_enabled: 0 + processes: 10 + loops: 1 + values: 6 + warmups: 1 + mod_original_name: "bm.iast_fixtures.str_methods" + function_name: "do_io_bytesio_read" + args: [' fOobaR\t \n'] + +aspect_iast_do_io_bytesio_read: + << : *aspect_no_iast_do_io_bytesio_read + processes: 10 + loops: 1 + values: 6 + warmups: 1 + iast_enabled: 1 + +aspect_no_iast_do_io_stringio_read: &aspect_no_iast_do_io_stringio_read + iast_enabled: 0 + processes: 10 + loops: 1 + values: 6 + warmups: 1 + mod_original_name: "bm.iast_fixtures.str_methods" + function_name: "do_io_stringio_read" + args: [' fOobaR\t \n'] + +aspect_iast_do_io_stringio_read: + << : *aspect_no_iast_do_io_stringio_read + processes: 10 + loops: 1 + values: 6 + warmups: 1 + iast_enabled: 1 + aspect_no_iast_do_join: &aspect_no_iast_do_join iast_enabled: 0 processes: 10 diff --git a/benchmarks/bm/iast_fixtures/str_methods.py b/benchmarks/bm/iast_fixtures/str_methods.py index 79431775cc4..2f7e0b1d342 100644 --- a/benchmarks/bm/iast_fixtures/str_methods.py +++ b/benchmarks/bm/iast_fixtures/str_methods.py @@ -21,6 +21,8 @@ from typing import Tuple import urllib.parse +import _io + def methodcaller(*args, **kwargs): return "im methodcaller" @@ -1245,3 +1247,13 @@ def urlib_urlsplit(text): def do_re_match_index(text, regexp, index): match = re.search(regexp, text) return match[index] + + +def do_io_stringio_read(string_input): + xxx = _io.StringIO(string_input) + return xxx.read() + + +def do_io_bytesio_read(string_input): + xxx = _io.BytesIO(string_input.encode("utf-8")) + return xxx.read() diff --git a/ddtrace/appsec/_common_module_patches.py b/ddtrace/appsec/_common_module_patches.py index 77bfabd435b..5ff91867848 100644 --- a/ddtrace/appsec/_common_module_patches.py +++ b/ddtrace/appsec/_common_module_patches.py @@ -31,6 +31,8 @@ def patch_common_modules(): try_wrap_function_wrapper("builtins", "open", wrapped_open_CFDDB7ABBA9081B6) try_wrap_function_wrapper("urllib.request", "OpenerDirector.open", wrapped_open_ED4CF71136E15EBF) + try_wrap_function_wrapper("_io", "BytesIO.read", wrapped_read_F3E51D71B4EC16EF) + try_wrap_function_wrapper("_io", "StringIO.read", wrapped_read_F3E51D71B4EC16EF) try_wrap_function_wrapper("os", "system", wrapped_system_5542593D237084A7) core.on("asm.block.dbapi.execute", execute_4C9BAC8E228EB347) if asm_config._iast_enabled: @@ -40,6 +42,22 @@ def patch_common_modules(): def unpatch_common_modules(): try_unwrap("builtins", "open") try_unwrap("urllib.request", "OpenerDirector.open") + try_unwrap("_io", "BytesIO.read") + try_unwrap("_io", "StringIO.read") + + +def wrapped_read_F3E51D71B4EC16EF(original_read_callable, instance, args, kwargs): + """ + wrapper for _io.BytesIO and _io.StringIO read function + """ + result = original_read_callable(*args, **kwargs) + if asm_config._iast_enabled: + from ddtrace.appsec._iast._taint_tracking import copy_and_shift_ranges_from_strings + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if is_pyobject_tainted(instance): + copy_and_shift_ranges_from_strings(instance, result, 0) + return result def _must_block(actions: Iterable[str]) -> bool: diff --git a/ddtrace/appsec/_constants.py b/ddtrace/appsec/_constants.py index 7bf3a96bda9..9054dc41882 100644 --- a/ddtrace/appsec/_constants.py +++ b/ddtrace/appsec/_constants.py @@ -2,6 +2,9 @@ from re import Match import sys +from _io import BytesIO +from _io import StringIO + if sys.version_info >= (3, 8): from typing import Literal # noqa:F401 @@ -122,7 +125,7 @@ class IAST(metaclass=Constant_Class): SEP_MODULES: Literal[","] = "," REQUEST_IAST_ENABLED: Literal["_dd.iast.request_enabled"] = "_dd.iast.request_enabled" TEXT_TYPES = (str, bytes, bytearray) - TAINTEABLE_TYPES = (str, bytes, bytearray, Match) + TAINTEABLE_TYPES = (str, bytes, bytearray, Match, BytesIO, StringIO) class IAST_SPAN_TAGS(metaclass=Constant_Class): diff --git a/ddtrace/appsec/_iast/_ast/ast_patching.py b/ddtrace/appsec/_iast/_ast/ast_patching.py index 0c409ee6837..572fca02ce6 100644 --- a/ddtrace/appsec/_iast/_ast/ast_patching.py +++ b/ddtrace/appsec/_iast/_ast/ast_patching.py @@ -262,7 +262,6 @@ "cattrs.", "ddsketch.", "ddtrace.", - "encodings.", # this package is used to load encodings when a module is imported, propagation is not needed "envier.", "exceptiongroup.", "freezegun.", # Testing utilities for time manipulation diff --git a/ddtrace/appsec/_iast/_ast/visitor.py b/ddtrace/appsec/_iast/_ast/visitor.py index 6d07aefde8b..b3f90a9046a 100644 --- a/ddtrace/appsec/_iast/_ast/visitor.py +++ b/ddtrace/appsec/_iast/_ast/visitor.py @@ -40,12 +40,16 @@ def _mark_avoid_convert_recursively(node): "definitions_module": "ddtrace.appsec._iast._taint_tracking.aspects", "alias_module": "ddtrace_aspects", "functions": { + "StringIO": "ddtrace_aspects.stringio_aspect", + "BytesIO": "ddtrace_aspects.bytesio_aspect", "str": "ddtrace_aspects.str_aspect", "bytes": "ddtrace_aspects.bytes_aspect", "bytearray": "ddtrace_aspects.bytearray_aspect", "ddtrace_iast_flask_patch": "ddtrace_aspects.empty_func", # To avoid recursion }, "stringalike_methods": { + "StringIO": "ddtrace_aspects.stringio_aspect", + "BytesIO": "ddtrace_aspects.bytesio_aspect", "decode": "ddtrace_aspects.decode_aspect", "join": "ddtrace_aspects.join_aspect", "encode": "ddtrace_aspects.encode_aspect", diff --git a/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.cpp b/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.cpp index 03cc81d2684..a0462691441 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.cpp +++ b/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.cpp @@ -144,6 +144,17 @@ get_pyobject_size(PyObject* obj) return len_candidate_text; } +bool +PyIOBase_Check(const PyObject* obj) +{ + try { + return py::isinstance((PyObject*)obj, safe_import("_io", "_IOBase")); + } catch (py::error_already_set& err) { + PyErr_Clear(); + return false; + } +} + bool PyReMatch_Check(const PyObject* obj) { diff --git a/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.h b/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.h index 5f74c820a15..d5c9dd59fa7 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.h +++ b/ddtrace/appsec/_iast/_taint_tracking/Utils/StringUtils.h @@ -32,6 +32,9 @@ get_unique_id(const PyObject* str) return reinterpret_cast(str); } +bool +PyIOBase_Check(const PyObject* obj); + bool PyReMatch_Check(const PyObject* obj); @@ -50,7 +53,7 @@ is_text(const PyObject* pyptr) inline bool is_tainteable(const PyObject* pyptr) { - return pyptr != nullptr and (is_text(pyptr) or PyReMatch_Check(pyptr)); + return pyptr != nullptr and (is_text(pyptr) or PyReMatch_Check(pyptr) or PyIOBase_Check(pyptr)); } // Base function for the variadic template diff --git a/ddtrace/appsec/_iast/_taint_tracking/__init__.py b/ddtrace/appsec/_iast/_taint_tracking/__init__.py index d9dec2ba3fe..fe86c083779 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/__init__.py +++ b/ddtrace/appsec/_iast/_taint_tracking/__init__.py @@ -1,3 +1,5 @@ +from io import BytesIO +from io import StringIO from typing import Any from typing import Tuple @@ -215,9 +217,9 @@ def trace_calls_and_returns(frame, event, arg): if frame in TAINTED_FRAMES: TAINTED_FRAMES.remove(frame) log.debug("Return from %s on line %d of %s, return value: %s", func_name, line_no, filename, arg) - if isinstance(arg, (str, bytes, bytearray, list, tuple, dict)): + if isinstance(arg, (str, bytes, bytearray, BytesIO, StringIO, list, tuple, dict)): if ( - (isinstance(arg, (str, bytes, bytearray)) and is_pyobject_tainted(arg)) + (isinstance(arg, (str, bytes, bytearray, BytesIO, StringIO)) and is_pyobject_tainted(arg)) or (isinstance(arg, (list, tuple)) and any([is_pyobject_tainted(x) for x in arg])) or (isinstance(arg, dict) and any([is_pyobject_tainted(x) for x in arg.values()])) ): diff --git a/ddtrace/appsec/_iast/_taint_tracking/aspects.py b/ddtrace/appsec/_iast/_taint_tracking/aspects.py index 87c14d52f78..da7f6fb8843 100644 --- a/ddtrace/appsec/_iast/_taint_tracking/aspects.py +++ b/ddtrace/appsec/_iast/_taint_tracking/aspects.py @@ -16,6 +16,8 @@ from typing import Tuple from typing import Union +import _io + from ddtrace.appsec._constants import IAST from .._taint_tracking import TagMappingMode @@ -94,9 +96,47 @@ "ospathsplitext_aspect", "ospathsplitdrive_aspect", "ospathsplitroot_aspect", + "bytesio_aspect", + "stringio_aspect", ] +def stringio_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> _io.StringIO: + if orig_function is not None: + if flag_added_args > 0: + args = args[flag_added_args:] + result = orig_function(*args, **kwargs) + else: + if flag_added_args > 0: + args = args[flag_added_args:] + result = _io.StringIO(*args, **kwargs) + + if args and is_pyobject_tainted(args[0]) and isinstance(result, _io.StringIO): + try: + copy_and_shift_ranges_from_strings(args[0], result, 0) + except Exception as e: + iast_taint_log_error("IAST propagation error. stringio_aspect. {}".format(e)) + return result + + +def bytesio_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> _io.BytesIO: + if orig_function is not None: + if flag_added_args > 0: + args = args[flag_added_args:] + result = orig_function(*args, **kwargs) + else: + if flag_added_args > 0: + args = args[flag_added_args:] + result = _io.BytesIO(*args, **kwargs) + + if args and is_pyobject_tainted(args[0]) and isinstance(result, _io.BytesIO): + try: + copy_and_shift_ranges_from_strings(args[0], result, 0) + except Exception as e: + iast_taint_log_error("IAST propagation error. bytesio_aspect. {}".format(e)) + return result + + def str_aspect(orig_function: Optional[Callable], flag_added_args: int, *args: Any, **kwargs: Any) -> str: if orig_function is not None: if orig_function != builtin_str: diff --git a/tests/appsec/app.py b/tests/appsec/app.py index b6a0b04824b..f1496449406 100644 --- a/tests/appsec/app.py +++ b/tests/appsec/app.py @@ -200,6 +200,258 @@ def iast_ast_patching_import_error(): return Response(str(module_with_import_errors.verbal_kint_is_keyser_soze)) +@app.route("/iast-ast-patching-io-bytesio-untainted", methods=["GET"]) +def iast_ast_patching_io_bytes_io_untainted(): + filename = "filename" + style = request.args.get("style") + bytes_filename = filename.encode() + if style == "_io_module": + import _io + + changed = _io.BytesIO(bytes_filename) + elif style == "io_module": + import io + + changed = io.BytesIO(bytes_filename) + elif style == "io_function": + from io import BytesIO + + changed = BytesIO(bytes_filename) + else: + from _io import BytesIO + + changed = BytesIO(bytes_filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if not is_pyobject_tainted(changed): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-stringio-untainted", methods=["GET"]) +def iast_ast_patching_io_string_io_untainted(): + filename = "filename" + style = request.args.get("style") + if style == "_io_module": + import _io + + changed = _io.StringIO(filename) + elif style == "io_module": + import io + + changed = io.StringIO(filename) + elif style == "io_function": + from io import StringIO + + changed = StringIO(filename) + else: + from _io import StringIO + + changed = StringIO(filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if not is_pyobject_tainted(changed): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-bytesio-read-untainted", methods=["GET"]) +def iast_ast_patching_io_bytes_io_read_untainted(): + filename = "filename" + style = request.args.get("style") + bytes_filename = filename.encode() + if style == "_io_module": + import _io + + changed = _io.BytesIO(bytes_filename) + elif style == "io_module": + import io + + changed = io.BytesIO(bytes_filename) + elif style == "io_function": + from io import BytesIO + + changed = BytesIO(bytes_filename) + else: + from _io import BytesIO + + changed = BytesIO(bytes_filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if not is_pyobject_tainted(changed.read(4)): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-stringio-read-untainted", methods=["GET"]) +def iast_ast_patching_io_string_io_read_untainted(): + filename = "filename" + style = request.args.get("style") + if style == "_io_module": + import _io + + changed = _io.StringIO(filename) + elif style == "io_module": + import io + + changed = io.StringIO(filename) + elif style == "io_function": + from io import StringIO + + changed = StringIO(filename) + else: + from _io import StringIO + + changed = StringIO(filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if not is_pyobject_tainted(changed.read(4)): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-bytesio", methods=["GET"]) +def iast_ast_patching_io_bytes_io(): + filename = request.args.get("filename") + style = request.args.get("style") + bytes_filename = filename.encode() + if style == "_io_module": + import _io + + changed = _io.BytesIO(bytes_filename) + elif style == "io_module": + import io + + changed = io.BytesIO(bytes_filename) + elif style == "io_function": + from io import BytesIO + + changed = BytesIO(bytes_filename) + else: + from _io import BytesIO + + changed = BytesIO(bytes_filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if is_pyobject_tainted(changed): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-stringio", methods=["GET"]) +def iast_ast_patching_io_string_io(): + filename = request.args.get("filename") + style = request.args.get("style") + if style == "_io_module": + import _io + + changed = _io.StringIO(filename) + elif style == "io_module": + import io + + changed = io.StringIO(filename) + elif style == "io_function": + from io import StringIO + + changed = StringIO(filename) + else: + from _io import StringIO + + changed = StringIO(filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if is_pyobject_tainted(changed): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-bytesio-read", methods=["GET"]) +def iast_ast_patching_io_bytes_io_read(): + filename = request.args.get("filename") + style = request.args.get("style") + bytes_filename = filename.encode() + if style == "_io_module": + import _io + + changed = _io.BytesIO(bytes_filename) + elif style == "io_module": + import io + + changed = io.BytesIO(bytes_filename) + elif style == "io_function": + from io import BytesIO + + changed = BytesIO(bytes_filename) + else: + from _io import BytesIO + + changed = BytesIO(bytes_filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if is_pyobject_tainted(changed.read(4)): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + +@app.route("/iast-ast-patching-io-stringio-read", methods=["GET"]) +def iast_ast_patching_io_string_io_read(): + filename = request.args.get("filename") + style = request.args.get("style") + if style == "_io_module": + import _io + + changed = _io.StringIO(filename) + elif style == "io_module": + import io + + changed = io.StringIO(filename) + elif style == "io_function": + from io import StringIO + + changed = StringIO(filename) + else: + from _io import StringIO + + changed = StringIO(filename) + resp = Response("Fail") + try: + from ddtrace.appsec._iast._taint_tracking import is_pyobject_tainted + + if is_pyobject_tainted(changed.read(4)): + resp = Response("OK") + except Exception as e: + print(e) + return resp + + @app.route("/iast-ast-patching-re-sub", methods=["GET"]) def iast_ast_patching_re_sub(): filename = request.args.get("filename") diff --git a/tests/appsec/iast/_ast/test_ast_patching.py b/tests/appsec/iast/_ast/test_ast_patching.py index 155a0270bc3..a9092ef7839 100644 --- a/tests/appsec/iast/_ast/test_ast_patching.py +++ b/tests/appsec/iast/_ast/test_ast_patching.py @@ -172,3 +172,39 @@ def test_module_path_none(caplog): with caplog.at_level(logging.DEBUG), mock.patch("ddtrace.internal.module.Path.resolve", side_effect=AttributeError): assert ("", "") == astpatch_module(__import__("tests.appsec.iast.fixtures.ast.str.class_str", fromlist=[None])) assert "astpatch_source couldn't find the module: tests.appsec.iast.fixtures.ast.str.class_str" in caplog.text + + +@pytest.mark.parametrize( + "module_name", + [ + ("tests.appsec.iast.fixtures.ast.io.module_stringio"), + ("tests.appsec.iast.fixtures.ast.io.function_stringio"), + ], +) +def test_astpatch_stringio_module_changed(module_name): + module_path, new_source = astpatch_module(__import__(module_name, fromlist=[None])) + assert ("", "") != (module_path, new_source) + new_code = astunparse.unparse(new_source) + assert new_code.startswith( + "\nimport ddtrace.appsec._iast.taint_sinks as ddtrace_taint_sinks" + "\nimport ddtrace.appsec._iast._taint_tracking.aspects as ddtrace_aspects" + ) + assert "ddtrace_aspects.stringio_aspect(" in new_code + + +@pytest.mark.parametrize( + "module_name", + [ + ("tests.appsec.iast.fixtures.ast.io.module_bytesio"), + ("tests.appsec.iast.fixtures.ast.io.function_bytesio"), + ], +) +def test_astpatch_bytesio_module_changed(module_name): + module_path, new_source = astpatch_module(__import__(module_name, fromlist=[None])) + assert ("", "") != (module_path, new_source) + new_code = astunparse.unparse(new_source) + assert new_code.startswith( + "\nimport ddtrace.appsec._iast.taint_sinks as ddtrace_taint_sinks" + "\nimport ddtrace.appsec._iast._taint_tracking.aspects as ddtrace_aspects" + ) + assert "ddtrace_aspects.bytesio_aspect(" in new_code diff --git a/tests/appsec/iast/fixtures/ast/io/function_bytesio.py b/tests/appsec/iast/fixtures/ast/io/function_bytesio.py new file mode 100644 index 00000000000..5ef6dc63e22 --- /dev/null +++ b/tests/appsec/iast/fixtures/ast/io/function_bytesio.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +from _io import BytesIO + + +def fixture_function_bytesio(): + return BytesIO(b"test") diff --git a/tests/appsec/iast/fixtures/ast/io/function_stringio.py b/tests/appsec/iast/fixtures/ast/io/function_stringio.py new file mode 100644 index 00000000000..4b2f2fe6063 --- /dev/null +++ b/tests/appsec/iast/fixtures/ast/io/function_stringio.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +from _io import StringIO + + +def fixture_function_stringio(): + return StringIO("test") diff --git a/tests/appsec/iast/fixtures/ast/io/module_bytesio.py b/tests/appsec/iast/fixtures/ast/io/module_bytesio.py new file mode 100644 index 00000000000..7f5185e8920 --- /dev/null +++ b/tests/appsec/iast/fixtures/ast/io/module_bytesio.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +import _io + + +def fixture_function_bytesio(): + return _io.BytesIO(b"test") diff --git a/tests/appsec/iast/fixtures/ast/io/module_read.py b/tests/appsec/iast/fixtures/ast/io/module_read.py new file mode 100644 index 00000000000..e128303c3aa --- /dev/null +++ b/tests/appsec/iast/fixtures/ast/io/module_read.py @@ -0,0 +1,8 @@ +#!/usr/bin/env python3 + + +def read_from_io(my_object): + try: + my_object.read(5) + except AttributeError as e: + raise AttributeError("Object does not have a read method") from e diff --git a/tests/appsec/iast/fixtures/ast/io/module_stringio.py b/tests/appsec/iast/fixtures/ast/io/module_stringio.py new file mode 100644 index 00000000000..493acebe596 --- /dev/null +++ b/tests/appsec/iast/fixtures/ast/io/module_stringio.py @@ -0,0 +1,6 @@ +#!/usr/bin/env python3 +import _io + + +def fixture_function_stringio(): + return _io.StringIO("test") diff --git a/tests/appsec/iast/fixtures/propagation_path.py b/tests/appsec/iast/fixtures/propagation_path.py index ce0ae4239b6..62cbc351ce7 100644 --- a/tests/appsec/iast/fixtures/propagation_path.py +++ b/tests/appsec/iast/fixtures/propagation_path.py @@ -6,6 +6,8 @@ import re import sys +import _io + ROOT_DIR = os.path.dirname(os.path.abspath(__file__)) @@ -207,4 +209,5 @@ def propagation_memory_check(origin_string1, tainted_string_2): _ = m.read() except Exception: pass - return string29 + + return _io.StringIO(string29).read() diff --git a/tests/appsec/iast_memcheck/test_iast_mem_check.py b/tests/appsec/iast_memcheck/test_iast_mem_check.py index 866075d8af2..f72d3d264aa 100644 --- a/tests/appsec/iast_memcheck/test_iast_mem_check.py +++ b/tests/appsec/iast_memcheck/test_iast_mem_check.py @@ -48,7 +48,7 @@ def __call__(self, stack: Stack) -> bool: return False -@pytest.mark.limit_leaks("8.3 KB", filter_fn=IASTFilter()) +@pytest.mark.limit_leaks("8.8 KB", filter_fn=IASTFilter()) @pytest.mark.parametrize( "origin1, origin2", [ diff --git a/tests/appsec/integrations/test_flask_iast_patching.py b/tests/appsec/integrations/test_flask_iast_patching.py index c2caf98c16d..3cb89ff6803 100644 --- a/tests/appsec/integrations/test_flask_iast_patching.py +++ b/tests/appsec/integrations/test_flask_iast_patching.py @@ -57,3 +57,33 @@ def test_flask_iast_ast_patching_re(style, endpoint, function): assert response.status_code == 200 assert response.content == b"OK" + + +@pytest.mark.parametrize("style", ["_io_module", "io_module", "io_function", "_io_function"]) +@pytest.mark.parametrize( + "function", + [ + "bytesio", + "stringio", + "bytesio-read", + "stringio-read", + "bytesio-untainted", + "stringio-untainted", + "bytesio-read-untainted", + "stringio-read-untainted", + ], +) +def test_flask_iast_ast_patching_io(style, function, endpoint="io"): + """ + Tests _io/io BytesIO and StringIO patching end to end + """ + filename = "path_traversal_test_file.txt" + with flask_server( + appsec_enabled="false", iast_enabled="true", token=None, port=8020, assert_debug=False + ) as context: + _, flask_client, pid = context + + response = flask_client.get(f"/iast-ast-patching-{endpoint}-{function}?style={style}&filename={filename}") + + assert response.status_code == 200 + assert response.content == b"OK"