Skip to content

Commit

Permalink
Make sure req_perform_stream always returns complete lines
Browse files Browse the repository at this point in the history
  • Loading branch information
jcheng5 committed Sep 6, 2024
1 parent a849d0c commit 6539d2f
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 16 deletions.
115 changes: 102 additions & 13 deletions R/req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,70 @@ resp_stream_raw <- function(resp, kb = 32) {
readBin(conn, raw(), kb * 1024)
}

find_line_boundary <- function(buffer) {
if (length(buffer) == 0) {
return(NULL)
}

Check warning on line 206 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L205-L206

Added lines #L205 - L206 were not covered by tests
# Look left 1 byte
right1 <- c(tail(buffer, -1), 0x00)

crlf <- buffer == 0x0D & right1 == 0x0A

Check warning on line 210 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L210

Added line #L210 was not covered by tests
cr <- buffer == 0x0D
lf <- buffer == 0x0A

all <- which(crlf | cr | lf)

Check warning on line 214 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L212-L214

Added lines #L212 - L214 were not covered by tests
if (length(all) == 0) {
return(NULL)
}

Check warning on line 218 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L216-L218

Added lines #L216 - L218 were not covered by tests
first <- all[[1]]
if (crlf[first]) {
return(first + 2)
} else {
return(first + 1)

Check warning on line 223 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L221-L223

Added lines #L221 - L223 were not covered by tests
}
}

Check warning on line 225 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L225

Added line #L225 was not covered by tests

#' @export
#' @rdname resp_stream_raw
#' @param lines How many lines to read
resp_stream_lines <- function(resp, lines = 1) {
resp_stream_lines <- function(resp, lines = 1, max_size = Inf, warn = TRUE) {
check_streaming_response(resp)
conn <- resp$body
check_number_whole(lines, min = 0, allow_infinite = TRUE)

if (lines == 0) {

Check warning on line 234 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L233-L234

Added lines #L233 - L234 were not covered by tests
# If you want to do that, who am I to judge?
return(character())

Check warning on line 236 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L236

Added line #L236 was not covered by tests
}

Check warning on line 238 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L238

Added line #L238 was not covered by tests
line_bytes <- resp_boundary_pushback(resp, max_size, find_line_boundary, include_trailer = TRUE)
if (length(line_bytes) == 0) {
return(character())
}

Check warning on line 243 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L241-L243

Added lines #L241 - L243 were not covered by tests
eat_next_lf <- resp$cache$resp_stream_lines_eat_next_lf
resp$cache$resp_stream_lines_eat_next_lf <- FALSE

if (identical(line_bytes, as.raw(0x0A)) && isTRUE(eat_next_lf)) {

Check warning on line 247 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L246-L247

Added lines #L246 - L247 were not covered by tests
# We hit that special edge case, see below
return(resp_stream_lines(resp, lines, max_size, warn))

Check warning on line 249 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L249

Added line #L249 was not covered by tests
}

readLines(conn, n = lines)
# If ending on \r, there's a special edge case here where if the
# next line begins with \n, that byte should be eaten.
if (tail(line_bytes, 1) == 0x0D) {
resp$cache$resp_stream_lines_eat_next_lf <- TRUE
}

Check warning on line 257 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L256-L257

Added lines #L256 - L257 were not covered by tests
# Use `resp$body` as the variable name so that if warn=TRUE, you get
# "incomplete final line found on 'resp$body'" as the warning message
`resp$body` <- line_bytes
line_con <- rawConnection(`resp$body`)
on.exit(close(line_con))
# TODO: Use iconv to convert from whatever encoding is specified in the
# response header, to UTF-8

Check warning on line 264 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L262-L264

Added lines #L262 - L264 were not covered by tests
readLines(line_con, n = 1, warn = warn)
}

# Slices the vector using the only sane semantics: start inclusive, end
Expand Down Expand Up @@ -265,19 +321,28 @@ find_event_boundary <- function(buffer) {

boundary_end <- boundary_end[1] # Take the first occurrence
split_at <- boundary_end + 1 # Split at one after the boundary
split_at
}

# Splits a buffer into the part before `split_at`, and the part starting at
# `split_at`. It's possible for either of the returned parts to be zero-length
# (i.e. if `split_at` is 1 or length(buffer)+1).
split_buffer <- function(buffer, split_at) {
# Return a list with the event data and the remaining buffer
list(
matched = slice(buffer, end = split_at),
remaining = slice(buffer, start = split_at)
)
}

#' @param max_size The maximum number of bytes to buffer; once
#' @export
#' @rdname resp_stream_raw
# TODO: max_size
resp_stream_sse <- function(resp, max_size = Inf) {
# @param max_size Maximum number of bytes to look for a boundary before throwing an error
# @param boundary_func A function that takes a raw vector and returns NULL if no
# boundary was detected, or one position PAST the end of the first boundary in
# the vector
# @param include_trailer If TRUE, at the end of the response, if there are
# bytes after the last boundary, then return those bytes; if FALSE, then those
# bytes are silently discarded.
resp_boundary_pushback <- function(resp, max_size, boundary_func, include_trailer) {
check_streaming_response(resp)
check_number_whole(max_size, min = 1, allow_infinite = TRUE)

Expand All @@ -295,22 +360,23 @@ resp_stream_sse <- function(resp, max_size = Inf) {
repeat {
# Try to find an event boundary using the data we have
print_buffer(buffer, "Buffer to parse")

Check warning on line 362 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L362

Added line #L362 was not covered by tests
result <- find_event_boundary(buffer)
split_at <- boundary_func(buffer)

if (!is.null(result)) {
if (!is.null(split_at)) {

Check warning on line 365 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L364-L365

Added lines #L364 - L365 were not covered by tests
result <- split_buffer(buffer, split_at)
# We found a complete event
print_buffer(result$matched, "Event data")
print_buffer(result$matched, "Matched data")

Check warning on line 368 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L367-L368

Added lines #L367 - L368 were not covered by tests
print_buffer(result$remaining, "Remaining buffer")
resp$cache$push_back <- result$remaining
return(parse_event(result$matched))
return(result$matched)

Check warning on line 371 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L370-L371

Added lines #L370 - L371 were not covered by tests
}

if (length(buffer) > max_size) {

Check warning on line 374 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L373-L374

Added lines #L373 - L374 were not covered by tests
# Keep the buffer in place, so that if the user tries resp_stream_sse
# again, they'll get the same error rather than reading the stream

Check warning on line 376 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L376

Added line #L376 was not covered by tests
# having missed a bunch of bytes.
resp$cache$push_back <- buffer
cli::cli_abort("SSE event exceeded size limit of {max_size}")
cli::cli_abort("Streaming read exceeded size limit of {max_size}")
}

# We didn't have enough data. Attempt to read more

Check warning on line 382 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L382

Added line #L382 was not covered by tests
Expand All @@ -324,6 +390,16 @@ resp_stream_sse <- function(resp, max_size = Inf) {

# If we've reached the end of input, store the buffer and return NULL

Check warning on line 391 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L391

Added line #L391 was not covered by tests
if (length(chunk) == 0) {
if (!isIncomplete(resp$body)) {
# We've truly reached the end of the connection; no more data is coming
if (include_trailer) {

Check warning on line 395 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L394-L395

Added lines #L394 - L395 were not covered by tests
return(buffer)
} else {
return(NULL)
}
}

# More data might come later

Check warning on line 402 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L397-L402

Added lines #L397 - L402 were not covered by tests
print_buffer(buffer, "Storing incomplete buffer")
resp$cache$push_back <- buffer
return(NULL)
Expand All @@ -336,6 +412,19 @@ resp_stream_sse <- function(resp, max_size = Inf) {
}
}

Check warning on line 413 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L413

Added line #L413 was not covered by tests

#' @param max_size The maximum number of bytes to buffer; once
#' @export
#' @rdname resp_stream_raw
# TODO: max_size
resp_stream_sse <- function(resp, max_size = Inf) {
event_bytes <- resp_boundary_pushback(resp, max_size, find_event_boundary, include_trailer = FALSE)
if (!is.null(event_bytes)) {
parse_event(event_bytes)
} else {
return(NULL)
}

Check warning on line 425 in R/req-perform-stream.R

View check run for this annotation

Codecov / codecov/patch

R/req-perform-stream.R#L422-L425

Added lines #L422 - L425 were not covered by tests
}

#' @export
#' @param ... Not used; included for compatibility with generic.
#' @rdname resp_stream_raw
Expand Down
137 changes: 134 additions & 3 deletions tests/testthat/test-req-perform-stream.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,118 @@ test_that("can't read from a closed connection", {
expect_no_error(close(resp))
})

test_that("can join lines across multiple reads", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
res$send_chunk("This is a ")
Sys.sleep(0.2)
res$send_chunk("complete sentence.\n")
})
server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))

# Non-blocking returns NULL until data is ready
resp1 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp1))

out <- resp_stream_lines(resp1)
expect_equal(out, character())
expect_equal(resp1$cache$push_back, charToRaw("This is a "))

while(length(out) == 0) {
Sys.sleep(0.1)
out <- resp_stream_lines(resp1)
}
expect_equal(out, "This is a complete sentence.")
})

test_that("handles line endings of multiple kinds", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
res$send_chunk("crlf\r\n")
Sys.sleep(0.1)
res$send_chunk("lf\n")
Sys.sleep(0.1)
res$send_chunk("cr\r")
Sys.sleep(0.1)
res$send_chunk("half line/")
Sys.sleep(0.1)
res$send_chunk("other half\n")
Sys.sleep(0.1)
res$send_chunk("broken crlf\r")
Sys.sleep(0.1)
res$send_chunk("\nanother line\n")
Sys.sleep(0.1)
res$send_chunk("eof without line ending")
})

server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))

resp1 <- req_perform_connection(req, blocking = TRUE)
withr::defer(close(resp1))

for (expected in c("crlf", "lf", "cr", "half line/other half", "broken crlf", "another line")) {
rlang::inject(expect_equal(resp_stream_lines(resp1), !!expected))
}
expect_warning(
expect_equal(resp_stream_lines(resp1), "eof without line ending"),
"incomplete final line"
)
expect_identical(resp_stream_lines(resp1), character(0))

# Same test, but now, non-blocking
resp2 <- req_perform_connection(req, blocking = FALSE)
withr::defer(close(resp2))

for (expected in c("crlf", "lf", "cr", "half line/other half", "broken crlf", "another line")) {
repeat {
out <- resp_stream_lines(resp2)
if (length(out) > 0) {
rlang::inject(expect_equal(out, !!expected))
break
}
}
}
expect_warning(
repeat {
out <- resp_stream_lines(resp2)
if (length(out) > 0) {
expect_equal(out, "eof without line ending")
break
}
},
"incomplete final line"
)
})

test_that("streams the specified number of lines", {
skip_on_covr()
app <- webfakes::new_app()

app$get("/events", function(req, res) {
res$send_chunk(paste(letters[1:5], collapse = "\n"))
})

server <- webfakes::local_app_process(app)
req <- request(server$url("/events"))

resp1 <- req_perform_connection(req, blocking = TRUE)
withr::defer(close(resp1))
expect_equal(
resp_stream_lines(resp1, 3),
c("a", "b", "c")
)
expect_equal(
resp_stream_lines(resp1, 3),
c("d", "e")
)
})

test_that("can feed sse events one at a time", {
skip_on_covr()
app <- webfakes::new_app()
Expand Down Expand Up @@ -112,7 +224,7 @@ test_that("can join sse events across multiple reads", {
expect_equal(out, list(type = "message", data = c("1", "2"), id = character()))
})

test_that("always interprets data as UTF-8", {
test_that("sse always interprets data as UTF-8", {
skip_on_covr()
app <- webfakes::new_app()

Expand Down Expand Up @@ -141,7 +253,7 @@ test_that("always interprets data as UTF-8", {
})
})

test_that("size limits enforced", {
test_that("streaming size limits enforced", {
skip_on_covr()
app <- webfakes::new_app()

Expand All @@ -161,12 +273,31 @@ test_that("size limits enforced", {
out <- resp_stream_sse(resp1, max_size = 999)
}
)

resp2 <- req_perform_connection(req, blocking = TRUE)
withr::defer(close(resp2))
expect_error(
out <- resp_stream_sse(resp2, max_size = 999)
)

resp3 <- req_perform_connection(req, blocking = TRUE)
withr::defer(close(resp3))
expect_error(
out <- resp_stream_lines(resp3, max_size = 999)
)
})

test_that("has a working find_event_boundary", {
boundary_test <- function(x, matched, remaining) {
buffer <- charToRaw(x)
split_at <- find_event_boundary(buffer)
result <- if (is.null(split_at)) {
NULL
} else {
split_buffer(buffer, split_at)
}
expect_identical(
find_event_boundary(charToRaw(x)),
result,
list(matched=charToRaw(matched), remaining = charToRaw(remaining))
)
}
Expand Down

0 comments on commit 6539d2f

Please sign in to comment.