Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Pool only when HTTP response fully read #255

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 53 additions & 18 deletions lib/resty/http.lua
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,11 @@ function _M.new(_)
if not sock then
return nil, err
end
return setmetatable({ sock = sock, keepalive = true }, mt)
return setmetatable({
sock = sock,
keepalive_supported = true,
reader_state = { keepalive_ready = false, mark_keepalive_ready_on_body_read = true }
}, mt)
end


Expand Down Expand Up @@ -195,7 +199,9 @@ function _M.tcp_only_connect(self, ...)
self.port = nil
end

self.keepalive = true
-- Immediately after connection - keepalive should be possible
self.reader_state.keepalive_ready = true
self.keepalive_supported = true
self.ssl = false

return sock:connect(...)
Expand All @@ -208,7 +214,11 @@ function _M.set_keepalive(self, ...)
return nil, "not initialized"
end

if self.keepalive == true then
if self.keepalive_supported == true then
if not self.reader_state.keepalive_ready then
return nil, "response not fully read"
end

return sock:setkeepalive(...)
else
-- The server said we must close the connection, so we cannot setkeepalive.
Expand Down Expand Up @@ -429,7 +439,7 @@ end
_M.transfer_encoding_is_chunked = transfer_encoding_is_chunked


local function _chunked_body_reader(sock, default_chunk_size)
local function _chunked_body_reader(reader_state, sock, default_chunk_size)
return co_wrap(function(max_chunk_size)
local remaining = 0
local length
Expand Down Expand Up @@ -487,11 +497,15 @@ local function _chunked_body_reader(sock, default_chunk_size)
end

until length == 0

if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
end)
end


local function _body_reader(sock, content_length, default_chunk_size)
local function _body_reader(reader_state, sock, content_length, default_chunk_size)
return co_wrap(function(max_chunk_size)
max_chunk_size = max_chunk_size or default_chunk_size

Expand Down Expand Up @@ -521,7 +535,9 @@ local function _body_reader(sock, content_length, default_chunk_size)
elseif not max_chunk_size then
-- We have a length and potentially keep-alive, but want everything.
co_yield(sock:receive(content_length))

if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
else
-- We have a length and potentially a keep-alive, and wish to stream
-- the response.
Expand Down Expand Up @@ -549,6 +565,9 @@ local function _body_reader(sock, content_length, default_chunk_size)
end

until length == 0
if reader_state.mark_keepalive_ready_on_body_read then
reader_state.keepalive_ready = true
end
end
end)
end
Expand Down Expand Up @@ -587,9 +606,11 @@ local function _read_body(res)
end


local function _trailer_reader(sock)
local function _trailer_reader(reader_state, sock)
return co_wrap(function()
co_yield(_receive_headers(sock))
-- We can always pool after reading trailers
reader_state.keepalive_ready = true
end)
end

Expand Down Expand Up @@ -654,6 +675,9 @@ function _M.send_request(self, params)
-- Apply defaults
setmetatable(params, { __index = DEFAULT_PARAMS })

-- Sending a new request makes keepalive disabled until its response is fully read
self.reader_state.keepalive_ready = false

local sock = self.sock
local body = params.body
local headers = http_headers.new()
Expand Down Expand Up @@ -781,7 +805,8 @@ function _M.read_response(self, params)
end


local res_headers, err = _receive_headers(sock)
local res_headers
res_headers, err = _receive_headers(sock)
if not res_headers then
return nil, err
end
Expand All @@ -791,38 +816,46 @@ function _M.read_response(self, params)
if ok then
if (version == 1.1 and str_find(connection, "close", 1, true)) or
(version == 1.0 and not str_find(connection, "keep-alive", 1, true)) then
self.keepalive = false
self.keepalive_supported = false
end
else
-- no connection header
if version == 1.0 then
self.keepalive = false
self.keepalive_supported = false
end
end

local body_reader = _no_body_reader
local trailer_reader, err
local trailer_reader
local has_body = false
local has_trailer = (res_headers["Trailer"] ~= nil)
self.reader_state.mark_keepalive_ready_on_body_read = not has_trailer

-- Receive the body_reader
if _should_receive_body(params.method, status) then
has_body = true

if version == 1.1 and transfer_encoding_is_chunked(res_headers) then
body_reader, err = _chunked_body_reader(sock)
body_reader, err = _chunked_body_reader(self.reader_state, sock)
else
local ok, length = pcall(tonumber, res_headers["Content-Length"])
local length
ok, length = pcall(tonumber, res_headers["Content-Length"])
if not ok then
-- No content-length header, read until connection is closed by server
length = nil
end

body_reader, err = _body_reader(sock, length)
body_reader, err = _body_reader(self.reader_state, sock, length)
end
else
if not has_trailer then
-- If there's no body and no trailer - it's ready for keep-alive
self.reader_state.keepalive_ready = true
end
end

if res_headers["Trailer"] then
trailer_reader, err = _trailer_reader(sock)
if has_trailer then
trailer_reader, err = _trailer_reader(self.reader_state, sock)
end

if err then
Expand Down Expand Up @@ -981,13 +1014,15 @@ function _M.get_client_body_reader(_, chunksize, sock)
end
end

-- Reading the request body has nothing to do with pooling the upstream server socket
local request_body_reader_state = { mark_keepalive_ready_on_body_read = false }
local headers = ngx_req_get_headers()
local length = headers.content_length
if length then
return _body_reader(sock, tonumber(length), chunksize)
return _body_reader(request_body_reader_state, sock, tonumber(length), chunksize)
elseif transfer_encoding_is_chunked(headers) then
-- Not yet supported by ngx_lua but should just work...
return _chunked_body_reader(sock, chunksize)
return _chunked_body_reader(request_body_reader_state, sock, chunksize)
else
return nil
end
Expand Down
20 changes: 11 additions & 9 deletions lib/resty/http_connect.lua
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@ be kept alive.
Call it with a single options table as follows:

client:connect {
scheme = "https" -- scheme to use, or nil for unix domain socket
host = "myhost.com", -- target machine, or a unix domain socket
port = nil, -- port on target machine, will default to 80/443 based on scheme
pool = nil, -- connection pool name, leave blank! this function knows best!
pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect
scheme = "https" -- scheme to use, or nil for unix domain socket
host = "myhost.com", -- target machine, or a unix domain socket
port = nil, -- port on target machine, will default to 80/443 based on scheme
pool = nil, -- connection pool name, leave blank! this function knows best!
pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect
backlog = nil,

-- ssl options as per: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake
ssl_reused_session = nil
ssl_server_name = nil,
ssl_send_status_req = nil,
ssl_verify = true, -- NOTE: defaults to true
ctx = nil, -- NOTE: not supported
ssl_verify = true, -- NOTE: defaults to true
ctx = nil, -- NOTE: not supported

-- mTLS options (experimental!)
--
Expand All @@ -43,7 +43,7 @@ client:connect {
ssl_client_cert = nil,
ssl_client_priv_key = nil,

proxy_opts, -- proxy opts, defaults to global proxy options
proxy_opts, -- proxy opts, defaults to global proxy options
}
]]
local function connect(self, options)
Expand Down Expand Up @@ -261,7 +261,9 @@ local function connect(self, options)

self.host = request_host
self.port = request_port
self.keepalive = true
-- Immediately after connection - keepalive should be possible
self.reader_state.keepalive_ready = true
self.keepalive_supported = true
self.ssl = ssl
-- set only for http, https has already been handled
self.http_proxy_auth = request_scheme ~= "https" and proxy_authorization or nil
Expand Down
Loading