Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support push + force push over SSH #1323

Merged
merged 1 commit into from
Apr 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 144 additions & 99 deletions josh-proxy/src/bin/josh-proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ use josh::{josh_error, JoshError, JoshResult};
use josh_rpc::calls::RequestedCommand;
use serde::Serialize;
use std::collections::HashMap;
use std::ffi::OsStr;
use std::io;
use std::net::IpAddr;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str::FromStr;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -729,14 +730,15 @@ async fn serve_namespace(
params: &josh_rpc::calls::ServeNamespace,
repo_path: std::path::PathBuf,
namespace: &str,
repo_update: RepoUpdate,
) -> josh::JoshResult<()> {
const SERVE_TIMEOUT: u64 = 60;

tracing::trace!(
"serve_namespace: command: {:?}, query: {}, namespace: {}",
params.command,
params.query,
namespace
command = ?params.command,
query = %params.query,
namespace = %namespace,
"serve_namespace",
);

enum ServeError {
Expand All @@ -746,25 +748,24 @@ async fn serve_namespace(
SubprocessExited(i32),
}

if params.command == RequestedCommand::GitReceivePack {
return Err(josh_error("Push over SSH is not supported"));
}

let command = match params.command {
RequestedCommand::GitUploadPack => "git-upload-pack",
RequestedCommand::GitUploadArchive => "git-upload-archive",
RequestedCommand::GitReceivePack => "git-receive-pack",
};

let overlay_path = repo_path.join("overlay");

let mut process = tokio::process::Command::new(command)
.arg(repo_path.join("overlay"))
.current_dir(repo_path.join("overlay"))
.arg(&overlay_path)
.current_dir(&overlay_path)
.env("GIT_DIR", &repo_path)
.env("GIT_NAMESPACE", namespace)
.env(
"GIT_ALTERNATE_OBJECT_DIRECTORIES",
repo_path.join("mirror").join("objects"),
)
.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()?;
Expand Down Expand Up @@ -912,6 +913,31 @@ fn head_ref_or_default(head_ref: &str) -> HeadRef {
}
}

fn make_repo_update(
remote_url: &str,
serv: Arc<JoshProxyService>,
filter: josh::filter::Filter,
remote_auth: RemoteAuth,
meta: &MetaConfig,
repo_path: &Path,
ns: Arc<josh_proxy::TmpGitNamespace>,
) -> RepoUpdate {
let context_propagator = josh_proxy::trace::make_context_propagator();

RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.to_string(),
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
git_ns: ns.name().to_string(),
git_dir: repo_path.display().to_string(),
mirror_git_dir: serv.repo_path.join("mirror").display().to_string(),
context_propagator,
}
}

async fn handle_serve_namespace_request(
serv: Arc<JoshProxyService>,
req: Request<hyper::Body>,
Expand Down Expand Up @@ -958,6 +984,9 @@ async fn handle_serve_namespace_request(
));
};

eprintln!("params: {:?}", params);
eprintln!("parsed_url.upstream_repo: {:?}", parsed_url.upstream_repo);

let auth_socket = params.ssh_socket.clone();
let remote_auth = RemoteAuth::Ssh {
auth_socket: auth_socket.clone(),
Expand Down Expand Up @@ -1000,24 +1029,34 @@ async fn handle_serve_namespace_request(
let remote_url = upstream + meta_config.config.repo.as_str();
let head_ref = head_ref_or_default(&parsed_url.headref);

let remote_refs = [head_ref.get()];
let remote_refs = match ssh_list_refs(&remote_url, auth_socket, Some(&remote_refs)).await {
Ok(remote_refs) => remote_refs,
Err(e) => {
return Ok(make_response(
hyper::Body::from(e.to_string()),
hyper::StatusCode::FORBIDDEN,
))
}
};
let resolved_ref = match params.command {
// When pushing over SSH, we need to fetch to get new references
// for searching for unapply base, so we don't bother with additional cache checks
RequestedCommand::GitReceivePack => None,
// Otherwise, list refs - it doesn't need locking and is faster -
// and use results to potentially skip fetching
_ => {
let remote_refs = [head_ref.get()];
let remote_refs =
match ssh_list_refs(&remote_url, auth_socket, Some(&remote_refs)).await {
Ok(remote_refs) => remote_refs,
Err(e) => {
return Ok(make_response(
hyper::Body::from(e.to_string()),
hyper::StatusCode::FORBIDDEN,
))
}
};

let resolved_ref = match remote_refs.get(head_ref.get()) {
Some(resolved_ref) => resolved_ref,
None => {
return Ok(make_response(
hyper::Body::from("Could not resolve remote ref"),
hyper::StatusCode::INTERNAL_SERVER_ERROR,
))
match remote_refs.get(head_ref.get()) {
Some(resolved_ref) => Some(resolved_ref.clone()),
None => {
return Ok(make_response(
hyper::Body::from("Could not resolve remote ref"),
hyper::StatusCode::INTERNAL_SERVER_ERROR,
))
}
}
}
};

Expand All @@ -1027,7 +1066,7 @@ async fn handle_serve_namespace_request(
&remote_auth,
remote_url.to_owned(),
Some(head_ref.get()),
Some(resolved_ref),
resolved_ref.as_deref(),
false,
)
.await
Expand Down Expand Up @@ -1095,7 +1134,19 @@ async fn handle_serve_namespace_request(
}
};

let serve_result = serve_namespace(&params, serv.repo_path.clone(), temp_ns.name()).await;
let overlay_path = serv.repo_path.join("overlay");
let repo_update = make_repo_update(
&remote_url,
serv.clone(),
filter,
remote_auth,
&meta_config,
&overlay_path,
temp_ns.clone(),
);

let serve_result =
serve_namespace(&params, serv.repo_path.clone(), temp_ns.name(), repo_update).await;
std::mem::drop(temp_ns);

match serve_result {
Expand Down Expand Up @@ -1295,68 +1346,39 @@ async fn call_service(
}

let temp_ns = prepare_namespace(serv.clone(), &meta, filter, &headref).await?;
let overlay_path = serv.repo_path.join("overlay");

let repo_path = serv
.repo_path
.join("overlay")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?
.to_string();

let mirror_repo_path = serv
.repo_path
.join("mirror")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?
.to_string();

let context_propagator = {
let span = tracing::Span::current();

let mut context_propagator = HashMap::<String, String>::default();
let context = span.context();
global::get_text_map_propagator(|propagator| {
propagator.inject_context(&context, &mut context_propagator);
});

tracing::debug!("context propagator: {:?}", context_propagator);
context_propagator
};

let repo_update = josh_proxy::RepoUpdate {
refs: HashMap::new(),
remote_url: remote_url.clone(),
let repo_update = make_repo_update(
&remote_url,
serv.clone(),
filter,
remote_auth,
port: serv.port.clone(),
filter_spec: josh::filter::spec(filter),
base_ns: josh::to_ns(&meta.config.repo),
git_ns: temp_ns.name().to_string(),
git_dir: repo_path.clone(),
mirror_git_dir: mirror_repo_path.clone(),
context_propagator,
};
&meta,
&overlay_path,
temp_ns.clone(),
);

let cgi_response = async {
let mut cmd = Command::new("git");
cmd.arg("http-backend");
cmd.current_dir(&serv.repo_path.join("overlay"));
cmd.env("GIT_DIR", &repo_path);
cmd.current_dir(&overlay_path);
cmd.env("GIT_DIR", &overlay_path);
cmd.env("GIT_HTTP_EXPORT_ALL", "");
cmd.env(
"GIT_ALTERNATE_OBJECT_DIRECTORIES",
serv.repo_path
.join("mirror")
.join("objects")
.to_str()
.ok_or(josh::josh_error("repo_path.to_str"))?,
.display()
.to_string(),
);
cmd.env("GIT_NAMESPACE", temp_ns.name());
cmd.env("GIT_PROJECT_ROOT", repo_path);
cmd.env("GIT_PROJECT_ROOT", &overlay_path);
cmd.env("JOSH_REPO_UPDATE", serde_json::to_string(&repo_update)?);
cmd.env("PATH_INFO", parsed_url.pathinfo.clone());

let (response, stderr) = hyper_cgi::do_cgi(req, cmd).await;
tracing::debug!("Git stderr: {}", String::from_utf8_lossy(&stderr));
tracing::debug!(stderr = %String::from_utf8_lossy(&stderr), "http-backend exited");

Ok::<_, JoshError>(response)
}
Expand Down Expand Up @@ -1655,35 +1677,41 @@ async fn run_housekeeping(local: std::path::PathBuf) -> josh::JoshResult<()> {
}
}

fn repo_update_from_env() -> josh::JoshResult<josh_proxy::RepoUpdate> {
let repo_update =
std::env::var("JOSH_REPO_UPDATE").map_err(|_| josh_error("JOSH_REPO_UPDATE not set"))?;

serde_json::from_str(&repo_update)
.map_err(|e| josh_error(&format!("Failed to parse JOSH_REPO_UPDATE: {}", e)))
}

fn pre_receive_hook() -> josh::JoshResult<i32> {
let repo_update: josh_proxy::RepoUpdate =
serde_json::from_str(&std::env::var("JOSH_REPO_UPDATE")?)?;
let repo_update = repo_update_from_env()?;

let p = std::path::PathBuf::from(repo_update.git_dir)
let push_options_path = std::path::PathBuf::from(repo_update.git_dir)
.join("refs/namespaces")
.join(repo_update.git_ns)
.join("push_options");

let n: usize = std::env::var("GIT_PUSH_OPTION_COUNT")?.parse()?;
let push_option_count: usize = std::env::var("GIT_PUSH_OPTION_COUNT")?.parse()?;

let mut push_options = std::collections::HashMap::<String, String>::new();
for i in 0..n {
let s = std::env::var(format!("GIT_PUSH_OPTION_{}", i))?;
if let [key, value] = s.as_str().split('=').collect::<Vec<_>>().as_slice() {
push_options.insert(key.to_string(), value.to_string());
let mut push_options = HashMap::<String, serde_json::Value>::new();
for i in 0..push_option_count {
let push_option = std::env::var(format!("GIT_PUSH_OPTION_{}", i))?;
if let Some((key, value)) = push_option.split_once("=") {
push_options.insert(key.into(), value.into());
} else {
push_options.insert(s, "".to_string());
push_options.insert(push_option, true.into());
}
}

std::fs::write(p, serde_json::to_string(&push_options)?)?;
std::fs::write(push_options_path, serde_json::to_string(&push_options)?)?;

Ok(0)
}

fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
let mut repo_update: josh_proxy::RepoUpdate =
serde_json::from_str(&std::env::var("JOSH_REPO_UPDATE")?)?;
let mut repo_update = repo_update_from_env()?;

repo_update
.refs
Expand All @@ -1696,24 +1724,33 @@ fn update_hook(refname: &str, old: &str, new: &str) -> josh::JoshResult<i32> {
.send();

match resp {
Ok(r) => {
let success = r.status().is_success();
if let Ok(body) = r.text() {
println!("response from upstream:\n{}\n\n", body);
} else {
println!("no upstream response");
Ok(resp) => {
let success = resp.status().is_success();
println!("upstream: response status: {}", resp.status());

match resp.text() {
Ok(text) if text.trim().is_empty() => {
println!("upstream: no response body");
}
Ok(text) => {
println!("upstream: response body:\n\n{}", text);
}
Err(err) => {
println!("upstream: warn: failed to read response body: {:?}", err);
}
}

if success {
return Ok(0);
Ok(0)
} else {
return Ok(1);
Ok(1)
}
}
Err(err) => {
tracing::warn!("/repo_update request failed {:?}", err);
Ok(1)
}
};
Ok(1)
}
}

async fn serve_graphql(
Expand Down Expand Up @@ -1949,8 +1986,16 @@ fn main() {

if let [a0, ..] = &std::env::args().collect::<Vec<_>>().as_slice() {
if a0.ends_with("/pre-receive") {
println!("josh-proxy");
std::process::exit(pre_receive_hook().unwrap_or(1));
eprintln!("josh-proxy: pre-receive hook");
let code = match pre_receive_hook() {
Ok(code) => code,
Err(e) => {
eprintln!("josh-proxy: pre-receive hook failed: {}", e);
std::process::exit(1);
}
};

std::process::exit(code);
}
}

Expand Down
Loading
Loading