Skip to content

Commit

Permalink
chore(iast): fastapi path parameter support
Browse files Browse the repository at this point in the history
  • Loading branch information
avara1986 committed Aug 28, 2024
1 parent 210e515 commit dadf1fd
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 0 deletions.
18 changes: 18 additions & 0 deletions ddtrace/appsec/_iast/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,21 @@ def _on_iast_fastapi_patch():
functools.partial(_patched_fastapi_function, OriginType.HEADER),
)
_set_metric_iast_instrumented_source(OriginType.HEADER)

# Instrumented on _iast_starlette_scope_taint
_set_metric_iast_instrumented_source(OriginType.HEAPATH_PARAMETERDER)


def _iast_instrument_starlette_scope(scope):
if _is_iast_enabled():
from ddtrace.appsec._iast._taint_tracking import OriginType
from ddtrace.appsec._iast._taint_tracking import taint_pyobject

if scope.get("path_params"):
try:
for k, v in scope["path_params"].items():
scope["path_params"][k] = taint_pyobject(
v, source_name=k, source_value=v, source_origin=OriginType.PATH_PARAMETER
)
except Exception:
log.debug("IAST: Unexpected exception while tainting path parameters", exc_info=True)
7 changes: 7 additions & 0 deletions ddtrace/contrib/internal/starlette/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from ddtrace import Pin
from ddtrace import config
from ddtrace._trace.span import Span # noqa:F401
from ddtrace.appsec._iast import _is_iast_enabled
from ddtrace.contrib import trace_utils
from ddtrace.contrib.asgi import TraceMiddleware
from ddtrace.contrib.trace_utils import with_traced_module
Expand Down Expand Up @@ -154,7 +155,13 @@ def traced_handler(wrapped, instance, args, kwargs):
if name == b"cookie":
request_cookies = value.decode("utf-8", errors="ignore")
break

if request_spans:
if _is_iast_enabled():
from ddtrace.appsec._iast._patch import _iast_instrument_starlette_scope

_iast_instrument_starlette_scope(scope)

trace_utils.set_http_meta(
request_spans[0],
"starlette",
Expand Down
56 changes: 56 additions & 0 deletions tests/contrib/fastapi/test_fastapi_appsec_iast.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,18 @@ def test_query_param_source(fastapi_application, client, tracer, test_spans):
@fastapi_application.get("/index.html")
async def test_route(request: Request):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

query_params = request.query_params.get("iast_queryparam")
ranges_result = get_tainted_ranges(query_params)

return JSONResponse(
{
"result": query_params,
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

Expand All @@ -78,22 +81,26 @@ async def test_route(request: Request):
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.parameter"


@pytest.mark.usefixtures("setup_core_ok_after_test")
def test_header_value_source(fastapi_application, client, tracer, test_spans):
@fastapi_application.get("/index.html")
async def test_route(request: Request):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

query_params = request.headers.get("iast_header")
ranges_result = get_tainted_ranges(query_params)

return JSONResponse(
{
"result": query_params,
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

Expand All @@ -113,6 +120,7 @@ async def test_route(request: Request):
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.header"


@pytest.mark.usefixtures("setup_core_ok_after_test")
Expand All @@ -122,14 +130,17 @@ def test_header_value_source_typing_param(fastapi_application, client, tracer, t
@fastapi_application.get("/index.html")
async def test_route(iast_header: typing.Annotated[str, Header()] = None):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

ranges_result = get_tainted_ranges(iast_header)

return JSONResponse(
{
"result": iast_header,
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

Expand All @@ -149,13 +160,15 @@ async def test_route(iast_header: typing.Annotated[str, Header()] = None):
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.header"


@pytest.mark.usefixtures("setup_core_ok_after_test")
def test_cookies_source(fastapi_application, client, tracer, test_spans):
@fastapi_application.get("/index.html")
async def test_route(request: Request):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

query_params = request.cookies.get("iast_cookie")
ranges_result = get_tainted_ranges(query_params)
Expand All @@ -165,6 +178,7 @@ async def test_route(request: Request):
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

Expand All @@ -184,6 +198,7 @@ async def test_route(request: Request):
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.cookie.value"


@pytest.mark.usefixtures("setup_core_ok_after_test")
Expand All @@ -193,14 +208,17 @@ def test_cookies_source_typing_param(fastapi_application, client, tracer, test_s
@fastapi_application.get("/index.html")
async def test_route(iast_cookie: typing.Annotated[str, Cookie()] = "ddd"):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

ranges_result = get_tainted_ranges(iast_cookie)

return JSONResponse(
{
"result": iast_cookie,
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

Expand All @@ -220,3 +238,41 @@ async def test_route(iast_cookie: typing.Annotated[str, Cookie()] = "ddd"):
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.cookie.value"


@pytest.mark.usefixtures("setup_core_ok_after_test")
def test_path_param_source(fastapi_application, client, tracer, test_spans):
@fastapi_application.get("/index.html/{item_id}")
async def test_route(item_id):
from ddtrace.appsec._iast._taint_tracking import get_tainted_ranges
from ddtrace.appsec._iast._taint_tracking import origin_to_str

ranges_result = get_tainted_ranges(item_id)

return JSONResponse(
{
"result": item_id,
"is_tainted": len(ranges_result),
"ranges_start": ranges_result[0].start,
"ranges_length": ranges_result[0].length,
"ranges_origin": origin_to_str(ranges_result[0].source.origin),
}
)

# test if asgi middleware is ok without any callback registered
core.reset_listeners(event_id="asgi.request.parse.body")

with override_global_config(dict(_iast_enabled=True)), override_env(IAST_ENV):
# disable callback
_aux_appsec_prepare_tracer(tracer)
resp = client.get(
"/index.html/test1234/",
)
assert resp.status_code == 200
result = json.loads(get_response_body(resp))
assert result["result"] == "test1234"
assert result["is_tainted"] == 1
assert result["ranges_start"] == 0
assert result["ranges_length"] == 8
assert result["ranges_origin"] == "http.request.path.parameter"

0 comments on commit dadf1fd

Please sign in to comment.