From 007fd7051dd43bc23b496746df12f2e328ce82ad Mon Sep 17 00:00:00 2001 From: Christian Schilling Date: Wed, 15 Nov 2023 11:14:58 +0100 Subject: [PATCH] Check fetch condition twice (#1298) Once before, and once after locking the semaphone Change: check-twice --- josh-proxy/src/bin/josh-proxy.rs | 118 ++++++++++++++++++++----------- 1 file changed, 78 insertions(+), 40 deletions(-) diff --git a/josh-proxy/src/bin/josh-proxy.rs b/josh-proxy/src/bin/josh-proxy.rs index 716f2f09..be1770e1 100644 --- a/josh-proxy/src/bin/josh-proxy.rs +++ b/josh-proxy/src/bin/josh-proxy.rs @@ -102,46 +102,16 @@ impl std::fmt::Debug for JoshProxyService { } } -#[tracing::instrument] -async fn fetch_upstream( +fn fetch_needed( service: Arc, - upstream_repo: String, - remote_auth: &RemoteAuth, - remote_url: String, + remote_url: &String, + upstream_repo: &String, + force: bool, head_ref: Option<&str>, head_ref_resolved: Option<&str>, - force: bool, -) -> Result<(), FetchError> { - let key = remote_url.clone(); - - let refs_to_fetch = match head_ref { - Some(head_ref) if head_ref != "HEAD" && !head_ref.starts_with("refs/heads/") => { - vec![ - "HEAD*", - "refs/josh/*", - "refs/heads/*", - "refs/tags/*", - head_ref, - ] - } - _ => { - vec!["HEAD*", "refs/josh/*", "refs/heads/*", "refs/tags/*"] - } - }; - - let refs_to_fetch: Vec<_> = refs_to_fetch.iter().map(|x| x.to_string()).collect(); - - let us = upstream_repo.clone(); - let semaphore = service - .fetch_permits - .lock()? - .entry(us.clone()) - .or_insert(Arc::new(tokio::sync::Semaphore::new(1))) - .clone(); - let permit = semaphore.acquire().await; - +) -> Result { let fetch_timer_ok = { - if let Some(last) = service.fetch_timers.read()?.get(&key) { + if let Some(last) = service.fetch_timers.read()?.get(remote_url) { let since = std::time::Instant::now().duration_since(*last); let max = std::time::Duration::from_secs(ARGS.cache_duration); @@ -171,24 +141,90 @@ async fn fetch_upstream( }; match (force, fetch_timer_ok, head_ref, head_ref_resolved) { - (false, true, None, _) => return Ok(()), + (false, true, None, _) => return Ok(false), (false, true, Some(head_ref), _) => { if (resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)?).is_some() { trace!("cache ref resolved"); - return Ok(()); + return Ok(false); } } (false, false, Some(head_ref), Some(head_ref_resolved)) => { if let Some(oid) = resolve_cache_ref(head_ref).map_err(FetchError::from_josh_error)? { if oid.to_string() == head_ref_resolved { trace!("cache ref resolved and matches"); - return Ok(()); + return Ok(false); } } } _ => (), }; + return Ok(true); +} + +#[tracing::instrument] +async fn fetch_upstream( + service: Arc, + upstream_repo: String, + remote_auth: &RemoteAuth, + remote_url: String, + head_ref: Option<&str>, + head_ref_resolved: Option<&str>, + force: bool, +) -> Result<(), FetchError> { + let refs_to_fetch = match head_ref { + Some(head_ref) if head_ref != "HEAD" && !head_ref.starts_with("refs/heads/") => { + vec![ + "HEAD*", + "refs/josh/*", + "refs/heads/*", + "refs/tags/*", + head_ref, + ] + } + _ => { + vec!["HEAD*", "refs/josh/*", "refs/heads/*", "refs/tags/*"] + } + }; + + let refs_to_fetch: Vec<_> = refs_to_fetch.iter().map(|x| x.to_string()).collect(); + + // Check if we really need to fetch before locking the semaphore. This avoids + // A "no fetch" case waiting for some already running fetch just to do nothing. + if !fetch_needed( + service.clone(), + &remote_url, + &upstream_repo, + force, + head_ref, + head_ref_resolved, + )? { + return Ok(()); + } + + let us = upstream_repo.clone(); + let semaphore = service + .fetch_permits + .lock()? + .entry(us.clone()) + .or_insert(Arc::new(tokio::sync::Semaphore::new(1))) + .clone(); + let permit = semaphore.acquire().await; + + // Check the fetch condition once again after locking the semaphore, as an unknown + // amount of time might have passed and the outcome of this check might have changed + // while waiting. + if !fetch_needed( + service.clone(), + &remote_url, + &upstream_repo, + force, + head_ref, + head_ref_resolved, + )? { + return Ok(()); + } + let fetch_timers = service.fetch_timers.clone(); let heads_map = service.heads_map.clone(); let br_path = service.repo_path.join("mirror"); @@ -220,7 +256,9 @@ async fn fetch_upstream( std::mem::drop(permit); if fetch_result.is_ok() { - fetch_timers.write()?.insert(key, std::time::Instant::now()); + fetch_timers + .write()? + .insert(remote_url.clone(), std::time::Instant::now()); } match (fetch_result, remote_auth) {