diff --git a/lib/resty/http.lua b/lib/resty/http.lua index 70c3bee..5e05171 100644 --- a/lib/resty/http.lua +++ b/lib/resty/http.lua @@ -134,7 +134,11 @@ function _M.new(_) if not sock then return nil, err end - return setmetatable({ sock = sock, keepalive = true }, mt) + return setmetatable({ + sock = sock, + keepalive_supported = true, + reader_state = { keepalive_ready = false, mark_keepalive_ready_on_body_read = true } + }, mt) end @@ -195,7 +199,9 @@ function _M.tcp_only_connect(self, ...) self.port = nil end - self.keepalive = true + -- Immediately after connection - keepalive should be possible + self.reader_state.keepalive_ready = true + self.keepalive_supported = true self.ssl = false return sock:connect(...) @@ -208,7 +214,11 @@ function _M.set_keepalive(self, ...) return nil, "not initialized" end - if self.keepalive == true then + if self.keepalive_supported == true then + if not self.reader_state.keepalive_ready then + return nil, "response not fully read" + end + return sock:setkeepalive(...) else -- The server said we must close the connection, so we cannot setkeepalive. @@ -429,7 +439,7 @@ end _M.transfer_encoding_is_chunked = transfer_encoding_is_chunked -local function _chunked_body_reader(sock, default_chunk_size) +local function _chunked_body_reader(reader_state, sock, default_chunk_size) return co_wrap(function(max_chunk_size) local remaining = 0 local length @@ -487,11 +497,15 @@ local function _chunked_body_reader(sock, default_chunk_size) end until length == 0 + + if reader_state.mark_keepalive_ready_on_body_read then + reader_state.keepalive_ready = true + end end) end -local function _body_reader(sock, content_length, default_chunk_size) +local function _body_reader(reader_state, sock, content_length, default_chunk_size) return co_wrap(function(max_chunk_size) max_chunk_size = max_chunk_size or default_chunk_size @@ -521,7 +535,9 @@ local function _body_reader(sock, content_length, default_chunk_size) elseif not max_chunk_size then -- We have a length and potentially keep-alive, but want everything. co_yield(sock:receive(content_length)) - + if reader_state.mark_keepalive_ready_on_body_read then + reader_state.keepalive_ready = true + end else -- We have a length and potentially a keep-alive, and wish to stream -- the response. @@ -549,6 +565,9 @@ local function _body_reader(sock, content_length, default_chunk_size) end until length == 0 + if reader_state.mark_keepalive_ready_on_body_read then + reader_state.keepalive_ready = true + end end end) end @@ -587,9 +606,11 @@ local function _read_body(res) end -local function _trailer_reader(sock) +local function _trailer_reader(reader_state, sock) return co_wrap(function() co_yield(_receive_headers(sock)) + -- We can always pool after reading trailers + reader_state.keepalive_ready = true end) end @@ -654,6 +675,9 @@ function _M.send_request(self, params) -- Apply defaults setmetatable(params, { __index = DEFAULT_PARAMS }) + -- Sending a new request makes keepalive disabled until its response is fully read + self.reader_state.keepalive_ready = false + local sock = self.sock local body = params.body local headers = http_headers.new() @@ -781,7 +805,8 @@ function _M.read_response(self, params) end - local res_headers, err = _receive_headers(sock) + local res_headers + res_headers, err = _receive_headers(sock) if not res_headers then return nil, err end @@ -791,38 +816,46 @@ function _M.read_response(self, params) if ok then if (version == 1.1 and str_find(connection, "close", 1, true)) or (version == 1.0 and not str_find(connection, "keep-alive", 1, true)) then - self.keepalive = false + self.keepalive_supported = false end else -- no connection header if version == 1.0 then - self.keepalive = false + self.keepalive_supported = false end end local body_reader = _no_body_reader - local trailer_reader, err + local trailer_reader local has_body = false + local has_trailer = (res_headers["Trailer"] ~= nil) + self.reader_state.mark_keepalive_ready_on_body_read = not has_trailer -- Receive the body_reader if _should_receive_body(params.method, status) then has_body = true if version == 1.1 and transfer_encoding_is_chunked(res_headers) then - body_reader, err = _chunked_body_reader(sock) + body_reader, err = _chunked_body_reader(self.reader_state, sock) else - local ok, length = pcall(tonumber, res_headers["Content-Length"]) + local length + ok, length = pcall(tonumber, res_headers["Content-Length"]) if not ok then -- No content-length header, read until connection is closed by server length = nil end - body_reader, err = _body_reader(sock, length) + body_reader, err = _body_reader(self.reader_state, sock, length) + end + else + if not has_trailer then + -- If there's no body and no trailer - it's ready for keep-alive + self.reader_state.keepalive_ready = true end end - if res_headers["Trailer"] then - trailer_reader, err = _trailer_reader(sock) + if has_trailer then + trailer_reader, err = _trailer_reader(self.reader_state, sock) end if err then @@ -981,13 +1014,15 @@ function _M.get_client_body_reader(_, chunksize, sock) end end + -- Reading the request body has nothing to do with pooling the upstream server socket + local request_body_reader_state = { mark_keepalive_ready_on_body_read = false } local headers = ngx_req_get_headers() local length = headers.content_length if length then - return _body_reader(sock, tonumber(length), chunksize) + return _body_reader(request_body_reader_state, sock, tonumber(length), chunksize) elseif transfer_encoding_is_chunked(headers) then -- Not yet supported by ngx_lua but should just work... - return _chunked_body_reader(sock, chunksize) + return _chunked_body_reader(request_body_reader_state, sock, chunksize) else return nil end diff --git a/lib/resty/http_connect.lua b/lib/resty/http_connect.lua index 4da98bc..997e38b 100644 --- a/lib/resty/http_connect.lua +++ b/lib/resty/http_connect.lua @@ -16,19 +16,19 @@ be kept alive. Call it with a single options table as follows: client:connect { - scheme = "https" -- scheme to use, or nil for unix domain socket - host = "myhost.com", -- target machine, or a unix domain socket - port = nil, -- port on target machine, will default to 80/443 based on scheme - pool = nil, -- connection pool name, leave blank! this function knows best! - pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect + scheme = "https" -- scheme to use, or nil for unix domain socket + host = "myhost.com", -- target machine, or a unix domain socket + port = nil, -- port on target machine, will default to 80/443 based on scheme + pool = nil, -- connection pool name, leave blank! this function knows best! + pool_size = nil, -- options as per: https://github.com/openresty/lua-nginx-module#tcpsockconnect backlog = nil, -- ssl options as per: https://github.com/openresty/lua-nginx-module#tcpsocksslhandshake ssl_reused_session = nil ssl_server_name = nil, ssl_send_status_req = nil, - ssl_verify = true, -- NOTE: defaults to true - ctx = nil, -- NOTE: not supported + ssl_verify = true, -- NOTE: defaults to true + ctx = nil, -- NOTE: not supported -- mTLS options (experimental!) -- @@ -43,7 +43,7 @@ client:connect { ssl_client_cert = nil, ssl_client_priv_key = nil, - proxy_opts, -- proxy opts, defaults to global proxy options + proxy_opts, -- proxy opts, defaults to global proxy options } ]] local function connect(self, options) @@ -261,7 +261,9 @@ local function connect(self, options) self.host = request_host self.port = request_port - self.keepalive = true + -- Immediately after connection - keepalive should be possible + self.reader_state.keepalive_ready = true + self.keepalive_supported = true self.ssl = ssl -- set only for http, https has already been handled self.http_proxy_auth = request_scheme ~= "https" and proxy_authorization or nil diff --git a/t/07-keepalive.t b/t/07-keepalive.t index 573bbf0..c39bafe 100644 --- a/t/07-keepalive.t +++ b/t/07-keepalive.t @@ -432,3 +432,166 @@ connection must be closed --- no_error_log [error] [warn] + +=== TEST 8 Generic interface, Connection: Keep-alive. Don't read body and check connection isn't reused +--- http_config eval: $::HttpConfig +--- config + location = /a { + content_by_lua ' + local http = require "resty.http" + local httpc = http.new() + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + + local res, err = httpc:request{ + path = "/b" + } + + ngx.say(res.headers["Connection"]) + local ok, err = httpc:set_keepalive() + ngx.say(err) + + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + ngx.say(httpc:get_reused_times()) + '; + } + location = /b { + content_by_lua ' + ngx.say("OK") + '; + } +--- request +GET /a +--- response_body +keep-alive +response not fully read +0 +--- no_error_log +[error] +[warn] + +=== TEST 9 Pooling connection immediately after connecting should work +--- http_config eval: $::HttpConfig +--- config + location = /a { + content_by_lua ' + local http = require "resty.http" + local httpc = http.new() + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + ngx.say(httpc:set_keepalive()) + '; + } +--- request +GET /a +--- response_body +1 +--- no_error_log +[error] +[warn] + +=== TEST 10 Reused client still checks pooling is ready +--- http_config eval: $::HttpConfig +--- config + location = /a { + content_by_lua ' + local http = require "resty.http" + local httpc = http.new() + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + + local res, err = httpc:request{ + path = "/b" + } + + local body = res:read_body() + + ngx.say(res.headers["Connection"]) + ngx.say(httpc:set_keepalive()) + + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + ngx.say(httpc:get_reused_times()) + res, err = httpc:request{ + path = "/b" + } + local ok + ok, err = httpc:set_keepalive() + ngx.say(err) + '; + } + location = /b { + content_by_lua ' + ngx.say("OK") + '; + } +--- request +GET /a +--- response_body +keep-alive +1 +1 +response not fully read +--- no_error_log +[error] +[warn] + +=== TEST 11 Test the connection is reused on non-body requests +--- http_config eval: $::HttpConfig +--- config + location = /a { + content_by_lua ' + local http = require "resty.http" + local httpc = http.new() + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + + local res, err = httpc:request{ + method = "HEAD", + path = "/b" + } + + ngx.say(res.headers["Connection"]) + ngx.say(httpc:set_keepalive()) + + httpc:connect({ + scheme = "http", + host = "127.0.0.1", + port = ngx.var.server_port + }) + ngx.say(httpc:get_reused_times()) + '; + } + location = /b { + content_by_lua ' + ngx.say("OK") + '; + } +--- request +GET /a +--- response_body +keep-alive +1 +1 +--- no_error_log +[error] +[warn] diff --git a/t/15-instance-reuse.t b/t/15-instance-reuse.t index 414e2cb..a951554 100644 --- a/t/15-instance-reuse.t +++ b/t/15-instance-reuse.t @@ -53,12 +53,12 @@ location /a { assert(res3 ~= res2, "responses should be unique tables") assert(res3.headers ~= res2.headers, "headers should be unique tables") - assert(httpc.keepalive == false, "keepalive flag should be false") + assert(httpc.keepalive_supported == false, "keepalive flag should be false") assert(httpc:connect("127.0.0.1", ngx.var.server_port), "connect should return positively") - assert(httpc.keepalive == true, "keepalive flag should be true") + assert(httpc.keepalive_supported == true, "keepalive flag should be true") } }