From 1cdd0013979f90ed329e3976d415d3a97927127a Mon Sep 17 00:00:00 2001 From: jasl Date: Tue, 12 Jan 2021 05:54:38 +0800 Subject: [PATCH] use async-http instead of concurrent-ruby + typhoeus --- Gemfile.lock | 34 ++++- cocoapods-core.gemspec | 3 +- lib/cocoapods-core/cdn_source.rb | 206 +++++++++++++++---------------- 3 files changed, 132 insertions(+), 111 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index ab5a7d6f3..70d182149 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -5,12 +5,11 @@ PATH activesupport (> 5.0, < 6) addressable (~> 2.6) algoliasearch (~> 1.0) - concurrent-ruby (~> 1.1) + async-http (~> 0.54) fuzzy_match (~> 2.0.4) nap (~> 1.0) netrc (~> 0.11) public_suffix - typhoeus (~> 1.0) GEM remote: https://rubygems.org/ @@ -26,14 +25,30 @@ GEM httpclient (~> 2.8, >= 2.8.3) json (>= 1.5.1) ast (2.4.1) + async (1.28.3) + console (~> 1.10) + nio4r (~> 2.3) + timers (~> 4.1) + async-http (0.54.1) + async (~> 1.25) + async-io (~> 1.28) + async-pool (~> 0.2) + protocol-http (~> 0.21.0) + protocol-http1 (~> 0.13.0) + protocol-http2 (~> 0.14.0) + async-io (1.30.1) + async (~> 1.14) + async-pool (0.3.3) + async (~> 1.25) awesome_print (1.8.0) bacon (1.2.0) coderay (1.1.3) concurrent-ruby (1.1.7) + console (1.10.1) + fiber-local crack (0.4.4) - ethon (0.12.0) - ffi (>= 1.3.0) ffi (1.14.2) + fiber-local (1.0.0) fuzzy_match (2.0.4) hashdiff (1.0.1) httpclient (2.8.3) @@ -56,12 +71,20 @@ GEM mocha (>= 0.13.0) nap (1.1.0) netrc (0.11.0) + nio4r (2.5.4) notify (0.5.2) parallel (1.20.1) parser (3.0.0.0) ast (~> 2.4.1) prettybacon (0.0.2) bacon (~> 1.2) + protocol-hpack (1.4.2) + protocol-http (0.21.0) + protocol-http1 (0.13.2) + protocol-http (~> 0.19) + protocol-http2 (0.14.1) + protocol-hpack (~> 1.4) + protocol-http (~> 0.18) pry (0.13.1) coderay (~> 1.1) method_source (~> 1.0) @@ -91,8 +114,7 @@ GEM rubocop-ast (>= 0.4.0) ruby-progressbar (1.11.0) thread_safe (0.3.6) - typhoeus (1.4.0) - ethon (>= 0.9.0) + timers (4.3.2) tzinfo (1.2.9) thread_safe (~> 0.1) unicode-display_width (2.0.0) diff --git a/cocoapods-core.gemspec b/cocoapods-core.gemspec index 63f2214f7..add2668f8 100644 --- a/cocoapods-core.gemspec +++ b/cocoapods-core.gemspec @@ -21,11 +21,10 @@ Gem::Specification.new do |s| # 6.0 requires Ruby 2.5.0 s.add_runtime_dependency 'activesupport', '> 5.0', '< 6' + s.add_runtime_dependency 'async-http', '~> 0.54' s.add_runtime_dependency 'nap', '~> 1.0' s.add_runtime_dependency 'fuzzy_match', '~> 2.0.4' s.add_runtime_dependency 'algoliasearch', '~> 1.0' - s.add_runtime_dependency 'concurrent-ruby', '~> 1.1' - s.add_runtime_dependency 'typhoeus', '~> 1.0' s.add_runtime_dependency 'netrc', '~> 0.11' s.add_runtime_dependency "addressable", '~> 2.6' s.add_runtime_dependency "public_suffix" diff --git a/lib/cocoapods-core/cdn_source.rb b/lib/cocoapods-core/cdn_source.rb index ab7acc7a4..7f3317f5f 100644 --- a/lib/cocoapods-core/cdn_source.rb +++ b/lib/cocoapods-core/cdn_source.rb @@ -1,20 +1,19 @@ require 'cocoapods-core/source' require 'rest' -require 'concurrent' require 'netrc' +require 'zlib' +require 'async' +require 'async/barrier' +require 'async/http' +require 'async/http/internet' require 'addressable' module Pod # Subclass of Pod::Source to provide support for CDN-based Specs repositories # class CDNSource < Source - include Concurrent - MAX_NUMBER_OF_RETRIES = (ENV['COCOAPODS_CDN_MAX_NUMBER_OF_RETRIES'] || 5).to_i - # Single thread executor for all network activity. - HYDRA_EXECUTOR = Concurrent::SingleThreadExecutor.new - - private_constant :HYDRA_EXECUTOR + REQUEST_TIMEOUT = 10 # @param [String] repo The name of the repository # @@ -30,6 +29,12 @@ def initialize(repo) super(repo) end + # @return [Async::HTTP::Internet] The async HTTP client. + # + def http_client + @http_client ||= Async::HTTP::Internet.new # :protocol => Async::HTTP::Protocol::HTTP2 + end + # @return [String] The URL of the source. # def url @@ -60,13 +65,19 @@ def preheat_existing_files files_to_update = files_definitely_to_update + deprecated_local_podspecs - ['deprecated_podspecs.txt'] debug "CDN: #{name} Going to update #{files_to_update.count} files" - concurrent_requests_catching_errors do - # Queue all tasks first - loaders = files_to_update.map do |file| - download_file_async(file) + Sync do + barrier = Async::Barrier.new + + concurrent_requests_catching_errors do + # Queue all tasks first + files_to_update.each do |file| + download_file_async(file, :context => barrier) + end end - # Block and wait for all to complete running on Hydra - Promises.zip_futures_on(HYDRA_EXECUTOR, *loaders).wait! + + barrier.wait + ensure + http_client&.close end end @@ -118,30 +129,33 @@ def versions(name) return nil if @version_arrays_by_fragment_by_name[fragment][name].nil? - concurrent_requests_catching_errors do - loaders = [] + Sync do + barrier = Async::Barrier.new - @versions_by_name[name] ||= @version_arrays_by_fragment_by_name[fragment][name].map do |version| - # Optimization: ensure all the podspec files at least exist. The correct one will get refreshed - # in #specification_path regardless. - podspec_version_path_relative = Pathname.new(version).join("#{name}.podspec.json") + concurrent_requests_catching_errors do + @versions_by_name[name] ||= @version_arrays_by_fragment_by_name[fragment][name].map do |version| + # Optimization: ensure all the podspec files at least exist. The correct one will get refreshed + # in #specification_path regardless. + podspec_version_path_relative = Pathname.new(version).join("#{name}.podspec.json") - unless pod_path_actual.join(podspec_version_path_relative).exist? - # Queue all podspec download tasks first - loaders << download_file_async(pod_path_relative.join(podspec_version_path_relative).to_s) - end + unless pod_path_actual.join(podspec_version_path_relative).exist? + # Queue all podspec download tasks first + download_file_async(pod_path_relative.join(podspec_version_path_relative).to_s, :context => barrier) + end - begin - Version.new(version) if version[0, 1] != '.' - rescue ArgumentError - raise Informative, 'An unexpected version directory ' \ + begin + Version.new(version) if version[0, 1] != '.' + rescue ArgumentError + raise Informative, 'An unexpected version directory ' \ "`#{version}` was encountered for the " \ "`#{pod_path_actual}` Pod in the `#{name}` repository." - end - end.compact.sort.reverse + end + end.compact.sort.reverse + end - # Block and wait for all to complete running on Hydra - Promises.zip_futures_on(HYDRA_EXECUTOR, *loaders).wait! + barrier.wait + ensure + http_client&.close end @versions_by_name[name] @@ -332,13 +346,14 @@ def relative_pod_path(pod_name) end def download_file(partial_url) - # Block the main thread waiting for Hydra to finish - # - # Used for single-file downloads - download_file_async(partial_url).wait! + Sync do |context| + download_file_async(partial_url, :context => context) + ensure + http_client&.close + end end - def download_file_async(partial_url) + def download_file_async(partial_url, context: nil) file_remote_url = Addressable::URI.encode(url + partial_url.to_s) path = repo + partial_url @@ -346,12 +361,12 @@ def download_file_async(partial_url) if file_okay if @startup_time < File.mtime(path) debug "CDN: #{name} Relative path: #{partial_url} modified during this run! Returning local" - return Promises.fulfilled_future(partial_url, HYDRA_EXECUTOR) + return partial_url end unless @check_existing_files_for_update debug "CDN: #{name} Relative path: #{partial_url} exists! Returning local because checking is only performed in repo update" - return Promises.fulfilled_future(partial_url, HYDRA_EXECUTOR) + return partial_url end end @@ -359,18 +374,18 @@ def download_file_async(partial_url) etag_path = path.sub_ext(path.extname + '.etag') - etag = File.read(etag_path) if file_okay && File.exist?(etag_path) + etag = file_okay && File.exist?(etag_path) ? File.read(etag_path) : nil debug "CDN: #{name} Relative path: #{partial_url}, has ETag? #{etag}" unless etag.nil? - download_and_save_with_retries_async(partial_url, file_remote_url, etag) + download_and_save_with_retries_async(partial_url, file_remote_url, etag, :context => context) end - def download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries = MAX_NUMBER_OF_RETRIES) + def download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries = MAX_NUMBER_OF_RETRIES, context: nil) path = repo + partial_url etag_path = path.sub_ext(path.extname + '.etag') - download_task = download_typhoeus_impl_async(file_remote_url, etag).then do |response| - case response.response_code + download_impl_async(file_remote_url, etag, :context => context) do |response| + case response.status when 301 redirect_location = response.headers['location'] debug "CDN: #{name} Redirecting from #{file_remote_url} to #{redirect_location}" @@ -382,7 +397,20 @@ def download_and_save_with_retries_async(partial_url, file_remote_url, etag, ret FileUtils.touch path partial_url when 200 - File.open(path, 'w') { |f| f.write(response.response_body.force_encoding('UTF-8')) } + File.open(path, 'w') do |f| + body = response.body.read + encoding = response.headers['content-encoding'].to_s + if encoding.present? + case encoding + when 'gzip' + body = Zlib::GzipReader.wrap(StringIO.new(body), &:read) + else + raise Informative, "CDN: #{name} URL couldn't be saved: #{file_remote_url} Content encoding: #{response.headers['content-encoding']}" + end + end + + f.write(body.force_encoding('UTF-8')) + end etag_new = response.headers['etag'] unless response.headers.nil? debug "CDN: #{name} Relative path downloaded: #{partial_url}, save ETag: #{etag_new}" @@ -397,9 +425,8 @@ def download_and_save_with_retries_async(partial_url, file_remote_url, etag, ret raise Informative, "CDN: #{name} URL couldn't be downloaded: #{file_remote_url} Response: #{response.response_code} #{response.response_body}" else debug "CDN: #{name} URL couldn't be downloaded: #{file_remote_url} Response: #{response.response_code} #{response.response_body}, retries: #{retries - 1}" - exponential_backoff_async(retries).then do - download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries - 1) - end + exponential_backoff_async(retries, :context => context) + download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries - 1) end when 0 # Non-HTTP errors, usually network layer @@ -407,23 +434,19 @@ def download_and_save_with_retries_async(partial_url, file_remote_url, etag, ret raise Informative, "CDN: #{name} URL couldn't be downloaded: #{file_remote_url} Response: #{response.return_message}" else debug "CDN: #{name} URL couldn't be downloaded: #{file_remote_url} Response: #{response.return_message}, retries: #{retries - 1}" - exponential_backoff_async(retries).then do - download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries - 1) - end + exponential_backoff_async(retries, :context => context) + download_and_save_with_retries_async(partial_url, file_remote_url, etag, retries - 1) end else raise Informative, "CDN: #{name} URL couldn't be downloaded: #{file_remote_url} Response: #{response.response_code} #{response.response_body}" end end - # Calling `Future#run` flattens the chained futures created by retries or redirects - # - # Does not, in fact, run the task - that is already happening in Hydra at this point - download_task.run + partial_url end - def exponential_backoff_async(retries) - sleep_async(backoff_time(retries)) + def exponential_backoff_async(retries, context:) + context.sleep backoff_time(retries) end def backoff_time(retries) @@ -431,40 +454,32 @@ def backoff_time(retries) 4 * 2**current_retry end - def sleep_async(seconds) - # Async sleep to avoid blocking either the main or the Hydra thread - Promises.schedule_on(HYDRA_EXECUTOR, seconds) - end - - def download_typhoeus_impl_async(file_remote_url, etag) - require 'typhoeus' - - # Create a prefereably HTTP/2 request - the protocol is ultimately responsible for picking - # the maximum supported protocol - # When debugging with proxy, use the following extra options: - # :proxy => 'http://localhost:8888', - # :ssl_verifypeer => false, - # :ssl_verifyhost => 0, - request = Typhoeus::Request.new( - file_remote_url, - :method => :get, - :http_version => :httpv2_0, - :timeout => 10, - :connecttimeout => 10, - :accept_encoding => 'gzip', - :netrc => :optional, - :netrc_file => Netrc.default_path, - :headers => etag.nil? ? {} : { 'If-None-Match' => etag }, - ) - - future = Promises.resolvable_future_on(HYDRA_EXECUTOR) - queue_request(request) - request.on_complete do |response| - future.fulfill(response) + def download_impl_async(file_remote_url, etag = nil, context: nil) + headers = [ + %w[Accept-Encoding gzip], + ] + + unless etag + headers << ['If-None-Match', etag.to_s] end - # This `Future` should never reject, network errors are exposed on `Typhoeus::Response` - future + info = Netrc.read + netrc_host = URI.parse(file_remote_url).host + creds = info[netrc_host] + if creds + user, pass = creds + headers << ['Authorization', "Basic #{user}:#{pass}"] + end + + method = context ? context.method(:async) : method(:Async) + method.call do |task| + task.with_timeout(REQUEST_TIMEOUT) do + response = http_client.get file_remote_url, headers + yield response if block_given? + ensure + response&.close + end + end end def debug(message) @@ -482,20 +497,5 @@ def concurrent_requests_catching_errors errors = e.errors raise Informative, "CDN: #{name} Repo update failed - #{e.errors.size} error(s):\n#{errors.join("\n")}" end - - def queue_request(request) - @hydra ||= Typhoeus::Hydra.new - - # Queue the request into the Hydra (libcurl reactor). - @hydra.queue(request) - - # Cycle the reactor on a separate thread - # - # The way it works is that if more requests are queued while Hydra is in the `#run` - # method, it will keep executing them - # - # The upcoming calls to `#run` will simply run empty. - HYDRA_EXECUTOR.post(@hydra, &:run) - end end end