From 5fa0a4f5368582d6214217c27d9fddbd14867b93 Mon Sep 17 00:00:00 2001 From: Sergey Tarasov Date: Fri, 3 May 2024 14:24:23 +0300 Subject: [PATCH] More log drain variants --- lib/command/run.rb | 127 +++++++++++++++++++++++++++++++---- lib/core/controlplane.rb | 4 ++ lib/core/controlplane_api.rb | 18 +++++ 3 files changed, 137 insertions(+), 12 deletions(-) diff --git a/lib/command/run.rb b/lib/command/run.rb index 8794b05c..17a10ee9 100644 --- a/lib/command/run.rb +++ b/lib/command/run.rb @@ -83,6 +83,8 @@ class Run < Base # rubocop:disable Metrics/ClassLength ``` EX + MAGIC_END = "---cpl run command finished---" + attr_reader :interactive, :detached, :location, :original_workload, :runner_workload, :container, :image_link, :image_changed, :job, :replica, :command @@ -236,12 +238,16 @@ def run_interactive cp.workload_exec(runner_workload, replica, location: location, container: container, command: command) end - def run_non_interactive # rubocop:disable Metrics/MethodLength + def run_non_interactive if detached print_detached_commands exit(ExitCode::SUCCESS) end + run_non_interactive_v3 + end + + def run_non_interactive_v1 # rubocop:disable Metrics/MethodLength logs_pid = Process.fork do # Catch Ctrl+C in the forked process trap("SIGINT") do @@ -262,6 +268,19 @@ def run_non_interactive # rubocop:disable Metrics/MethodLength exit(exit_status) end + def run_non_interactive_v2 + logs_pipe = IO.popen(["cpl", "logs", *app_workload_replica_args]) + + exit_status = wait_for_job_status_and_log(logs_pipe) + @internal_sigint = true + Process.kill("INT", logs_pipe.pid) + exit(exit_status) + end + + def run_non_interactive_v3 + exit(show_logs_waiting) + end + def base_workload_specs(workload) spec = cp.fetch_workload!(workload).fetch("spec") container_spec = spec["containers"].detect { _1["name"] == original_workload } || spec["containers"].first @@ -356,27 +375,45 @@ def runner_script # rubocop:disable Metrics/MethodLength SCRIPT script += interactive_runner_script if interactive - script += args_join(config.args) + script += "(" + args_join(config.args) + ")" # we need a subshell to continue on error from here + script += <<~SCRIPT + + CPL_EXIT_CODE=$? + echo '#{MAGIC_END}' + exit $CPL_EXIT_CODE + SCRIPT script end def wait_for_job_status # rubocop:disable Metrics/MethodLength loop do - result = cp.fetch_cron_workload(runner_workload, location: location) - job_details = result&.dig("items")&.find { |item| item["id"] == job } - status = job_details&.dig("status") - - case status - when "failed" - return ExitCode::ERROR_DEFAULT - when "successful" - return ExitCode::SUCCESS - end + exit_code = resolve_job_status + break exit_code unless exit_code.nil? Kernel.sleep(1) end end + def wait_for_job_status_and_log(logs_pipe) + magic_found = false + no_logs_count = 0 + loop do + if logs_pipe.ready? + no_logs_count = 0 + line = logs_pipe.gets + magic_found ||= line.chomp == MAGIC_END + next puts(line) unless magic_found + end + + Kernel.sleep(0.5) + no_logs_count += 1 + next if !magic_found && no_logs_count < 60 + + exit_code = resolve_job_status + break exit_code unless exit_code.nil? + end + end + def print_detached_commands app_workload_replica_config = app_workload_replica_args.join(" ") progress.puts( @@ -385,5 +422,71 @@ def print_detached_commands "- To stop the job, run:\n `cpl ps:stop #{app_workload_replica_config}`\n" ) end + + def get_job_status + result = cp.fetch_cron_workload(runner_workload, location: location) + job_details = result&.dig("items")&.find { |item| item["id"] == job } + job_details&.dig("status") + end + + def resolve_job_status + case get_job_status + when "failed" + ExitCode::ERROR_DEFAULT + when "successful" + ExitCode::SUCCESS + end + end + + ########################################### + ### temporary extaction from run:detached + ########################################### + def show_logs_waiting # rubocop:disable Metrics/MethodLength + progress.puts("Scheduled, fetching logs (it's a cron job, so it may take up to a minute to start)...\n\n") + retries = 0 + begin + @finished = false + job_finished_count = 0 + loop do + print_uniq_logs + break if @finished + next if @printed_log_entries_changed + + sleep(1) + status = get_job_status + job_finished_count += 1 if %w[failed successful].include?(status) + break if job_finished_count > 5 + end + resolve_job_status + rescue RuntimeError => e + raise "#{e} Exiting..." unless retries < 10 #MAX_RETRIES + + progress.puts(Shell.color("ERROR: #{e} Retrying...", :red)) + retries += 1 + retry + end + end + + def print_uniq_logs + @printed_log_entries ||= [] + ts = Time.now.to_i + entries = normalized_log_entries(from: ts - 60, to: ts) + + @printed_log_entries_changed = false + (entries - @printed_log_entries).sort.each do |(_ts, val)| + @printed_log_entries_changed = true + val.chomp == MAGIC_END ? @finished = true : progress.puts(val) + end + + @printed_log_entries = entries # as well truncate old entries if any + end + + def normalized_log_entries(from:, to:) + log = cp.log_get(workload: runner_workload, from: from, to: to, replica:) + + log["data"]["result"] + .each_with_object([]) { |obj, result| result.concat(obj["values"]) } + .select { |ts, _val| ts[..-10].to_i > from } + end end end diff --git a/lib/core/controlplane.rb b/lib/core/controlplane.rb index fb269f6b..ea21cd85 100644 --- a/lib/core/controlplane.rb +++ b/lib/core/controlplane.rb @@ -308,6 +308,10 @@ def logs(workload:, limit:, since:, replica: nil) perform!(cmd, output_mode: :all) end + def log_get(workload:, replica: nil, from:, to:) + api.log_get(org: org, gvc: gvc, workload: workload, replica: replica, from: from, to: to) + end + # identities def fetch_identity(identity, a_gvc = gvc) diff --git a/lib/core/controlplane_api.rb b/lib/core/controlplane_api.rb index f79b41fa..0d1e305d 100644 --- a/lib/core/controlplane_api.rb +++ b/lib/core/controlplane_api.rb @@ -33,6 +33,24 @@ def image_delete(org:, image:) api_json("/org/#{org}/image/#{image}", method: :delete) end + def log_get(org:, gvc:, workload: nil, replica: nil, from: nil, to: nil) # rubocop:disable Metrics/ParameterLists + query = { gvc: gvc } + query[:workload] = workload if workload + query[:replica] = replica if replica + query = query.map { |k, v| %(#{k}="#{v}") }.join(",").then { "{#{_1}}" } + + params = { query: query } + params[:from] = "#{from}000000000" if from + params[:to] = "#{to}000000000" if to + params[:limit] = "5000" + # params << "delay_for=0" + # params << "limit=30" + # params << "direction=forward" + params = params.map { |k, v| %(#{k}=#{CGI.escape(v)}) }.join("&") + + api_json_direct("/logs/org/#{org}/loki/api/v1/query_range?#{params}", method: :get, host: :logs) + end + def query_workloads(org:, gvc:, workload:, gvc_op_type:, workload_op_type:) # rubocop:disable Metrics/MethodLength terms = [ {