Skip to content

Commit

Permalink
More log drain variants
Browse files Browse the repository at this point in the history
  • Loading branch information
dzirtusss committed May 3, 2024
1 parent 3cc5fbd commit 5fa0a4f
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 12 deletions.
127 changes: 115 additions & 12 deletions lib/command/run.rb
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ class Run < Base # rubocop:disable Metrics/ClassLength
```
EX

MAGIC_END = "---cpl run command finished---"

attr_reader :interactive, :detached, :location, :original_workload, :runner_workload,
:container, :image_link, :image_changed, :job, :replica, :command

Expand Down Expand Up @@ -236,12 +238,16 @@ def run_interactive
cp.workload_exec(runner_workload, replica, location: location, container: container, command: command)
end

def run_non_interactive # rubocop:disable Metrics/MethodLength
def run_non_interactive
if detached
print_detached_commands
exit(ExitCode::SUCCESS)
end

run_non_interactive_v3
end

def run_non_interactive_v1 # rubocop:disable Metrics/MethodLength
logs_pid = Process.fork do
# Catch Ctrl+C in the forked process
trap("SIGINT") do
Expand All @@ -262,6 +268,19 @@ def run_non_interactive # rubocop:disable Metrics/MethodLength
exit(exit_status)
end

def run_non_interactive_v2
logs_pipe = IO.popen(["cpl", "logs", *app_workload_replica_args])

exit_status = wait_for_job_status_and_log(logs_pipe)
@internal_sigint = true
Process.kill("INT", logs_pipe.pid)
exit(exit_status)
end

def run_non_interactive_v3
exit(show_logs_waiting)
end

def base_workload_specs(workload)
spec = cp.fetch_workload!(workload).fetch("spec")
container_spec = spec["containers"].detect { _1["name"] == original_workload } || spec["containers"].first
Expand Down Expand Up @@ -356,27 +375,45 @@ def runner_script # rubocop:disable Metrics/MethodLength
SCRIPT

script += interactive_runner_script if interactive
script += args_join(config.args)
script += "(" + args_join(config.args) + ")" # we need a subshell to continue on error from here
script += <<~SCRIPT
CPL_EXIT_CODE=$?
echo '#{MAGIC_END}'
exit $CPL_EXIT_CODE
SCRIPT
script
end

def wait_for_job_status # rubocop:disable Metrics/MethodLength
loop do
result = cp.fetch_cron_workload(runner_workload, location: location)
job_details = result&.dig("items")&.find { |item| item["id"] == job }
status = job_details&.dig("status")

case status
when "failed"
return ExitCode::ERROR_DEFAULT
when "successful"
return ExitCode::SUCCESS
end
exit_code = resolve_job_status
break exit_code unless exit_code.nil?

Kernel.sleep(1)
end
end

def wait_for_job_status_and_log(logs_pipe)
magic_found = false
no_logs_count = 0
loop do
if logs_pipe.ready?
no_logs_count = 0
line = logs_pipe.gets
magic_found ||= line.chomp == MAGIC_END
next puts(line) unless magic_found
end

Kernel.sleep(0.5)
no_logs_count += 1
next if !magic_found && no_logs_count < 60

exit_code = resolve_job_status
break exit_code unless exit_code.nil?
end
end

def print_detached_commands
app_workload_replica_config = app_workload_replica_args.join(" ")
progress.puts(
Expand All @@ -385,5 +422,71 @@ def print_detached_commands
"- To stop the job, run:\n `cpl ps:stop #{app_workload_replica_config}`\n"
)
end

def get_job_status
result = cp.fetch_cron_workload(runner_workload, location: location)
job_details = result&.dig("items")&.find { |item| item["id"] == job }
job_details&.dig("status")
end

def resolve_job_status
case get_job_status
when "failed"
ExitCode::ERROR_DEFAULT
when "successful"
ExitCode::SUCCESS
end
end

###########################################
### temporary extaction from run:detached
###########################################
def show_logs_waiting # rubocop:disable Metrics/MethodLength
progress.puts("Scheduled, fetching logs (it's a cron job, so it may take up to a minute to start)...\n\n")
retries = 0
begin
@finished = false
job_finished_count = 0
loop do
print_uniq_logs
break if @finished
next if @printed_log_entries_changed

sleep(1)
status = get_job_status
job_finished_count += 1 if %w[failed successful].include?(status)
break if job_finished_count > 5
end
resolve_job_status
rescue RuntimeError => e
raise "#{e} Exiting..." unless retries < 10 #MAX_RETRIES

progress.puts(Shell.color("ERROR: #{e} Retrying...", :red))
retries += 1
retry
end
end

def print_uniq_logs
@printed_log_entries ||= []
ts = Time.now.to_i
entries = normalized_log_entries(from: ts - 60, to: ts)

@printed_log_entries_changed = false
(entries - @printed_log_entries).sort.each do |(_ts, val)|
@printed_log_entries_changed = true
val.chomp == MAGIC_END ? @finished = true : progress.puts(val)
end

@printed_log_entries = entries # as well truncate old entries if any
end

def normalized_log_entries(from:, to:)
log = cp.log_get(workload: runner_workload, from: from, to: to, replica:)

log["data"]["result"]
.each_with_object([]) { |obj, result| result.concat(obj["values"]) }
.select { |ts, _val| ts[..-10].to_i > from }
end
end
end
4 changes: 4 additions & 0 deletions lib/core/controlplane.rb
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ def logs(workload:, limit:, since:, replica: nil)
perform!(cmd, output_mode: :all)
end

def log_get(workload:, replica: nil, from:, to:)
api.log_get(org: org, gvc: gvc, workload: workload, replica: replica, from: from, to: to)
end

# identities

def fetch_identity(identity, a_gvc = gvc)
Expand Down
18 changes: 18 additions & 0 deletions lib/core/controlplane_api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ def image_delete(org:, image:)
api_json("/org/#{org}/image/#{image}", method: :delete)
end

def log_get(org:, gvc:, workload: nil, replica: nil, from: nil, to: nil) # rubocop:disable Metrics/ParameterLists
query = { gvc: gvc }
query[:workload] = workload if workload
query[:replica] = replica if replica
query = query.map { |k, v| %(#{k}="#{v}") }.join(",").then { "{#{_1}}" }

params = { query: query }
params[:from] = "#{from}000000000" if from
params[:to] = "#{to}000000000" if to
params[:limit] = "5000"
# params << "delay_for=0"
# params << "limit=30"
# params << "direction=forward"
params = params.map { |k, v| %(#{k}=#{CGI.escape(v)}) }.join("&")

api_json_direct("/logs/org/#{org}/loki/api/v1/query_range?#{params}", method: :get, host: :logs)
end

def query_workloads(org:, gvc:, workload:, gvc_op_type:, workload_op_type:) # rubocop:disable Metrics/MethodLength
terms = [
{
Expand Down

0 comments on commit 5fa0a4f

Please sign in to comment.