branch: main
git.rs
41936 bytesRaw
//! Git smart HTTP protocol handlers for receive-pack and upload-pack.
use crate::{pack, store, KEYFRAME_INTERVAL};
use std::collections::{HashMap, HashSet, VecDeque};
use worker::*;
// ---------------------------------------------------------------------------
// git-receive-pack (handles `git push`)
// ---------------------------------------------------------------------------
/// Process a git-receive-pack POST request.
///
/// Uses a streaming approach: builds a lightweight index of the pack entries,
/// then processes objects by type (commits → trees → blobs), decompressing
/// each entry on-demand from the pack bytes. Only one resolved object is held
/// in memory at a time, keeping peak memory to pack_size + O(1 object).
///
/// 1. Parse pkt-line ref update commands
/// 2. Build pack index (decompress-to-sink, no data held)
/// 3. Pre-compute types by following OFS_DELTA chains
/// 4. Process commits (decompress, store, drop)
/// 5. Process trees (decompress, store, drop)
/// 6. Resolve blob paths (all trees now in DB)
/// 7. Process blobs (decompress, store with xpatch compression, drop)
/// 8. Update refs
/// 9. Return report-status
pub fn handle_receive_pack(sql: &SqlStorage, body: &[u8]) -> Result<Response> {
// --- 1. Parse ref update commands from pkt-lines ---
let request = parse_receive_pack_request(body);
let preferred_sideband_mode = preferred_sideband_mode(&request.capabilities);
let sideband_mode = match requested_sideband_mode(&request.capabilities) {
Ok(mode) => mode,
Err(e) => {
return protocol_fatal_response(
"git-receive-pack",
&format!("receive-pack capabilities: {}", e),
preferred_sideband_mode,
)
}
};
let send_progress = should_send_receive_progress(&request, sideband_mode);
// Git splits large pushes (>http.postBuffer) into two POSTs:
// 1st: a 4-byte flush "0000" (no commands, no pack)
// 2nd: the full payload (commands + flush + pack)
// Return 200 for the probe so git proceeds with the real request.
if request.commands.is_empty() {
let mut resp = Response::from_bytes(Vec::new())?;
resp.headers_mut()
.set("Content-Type", "application/x-git-receive-pack-result")?;
return Ok(resp);
}
// --- Check bulk mode (skip_fts=1 skips commit graph + FTS indexing) ---
let result: Result<Response> = (|| {
let bulk_mode = store::get_config(sql, "skip_fts")?
.map(|v| v == "1")
.unwrap_or(false);
// --- 2-7. Process pack data (streaming) ---
// Note: Cloudflare DO SQLite does not support BEGIN/COMMIT via sql.exec().
// transactionSync() is not available in workers-rs 0.7.5.
// Each sql.exec() auto-commits individually. If the DO times out mid-push,
// partial state may result. Use the admin/set-ref endpoint to recover.
// TODO: look into getting transaction support in workers-rs
let pack_data = &body[request.pack_offset..];
// Reject oversized packs before any object is parsed. A clean ng response
// is far better than an OOM panic mid-push. The push script already splits
// at 30 MB; this is a server-side safety net.
if pack_data.len() > pack::MAX_PACK_BYTES {
let reason = format!(
"pack too large ({} MB, limit {} MB)",
pack_data.len() / 1_000_000,
pack::MAX_PACK_BYTES / 1_000_000,
);
let status_body = build_unpack_error_status(&request.commands, &reason);
let progress = if send_progress {
receive_pack_progress_messages(
&request,
pack_data.len(),
0,
request.commands.len(),
false,
)
} else {
Vec::new()
};
return protocol_result_response(
"git-receive-pack",
maybe_sideband_wrap_with_progress(status_body, &progress, sideband_mode),
);
}
if pack_data.len() > 4 && &pack_data[..4] == b"PACK" {
process_pack_streaming(sql, pack_data, bulk_mode)?;
}
// --- 8. Update refs ---
let mut results: Vec<(String, std::result::Result<(), String>)> = Vec::new();
for cmd in &request.commands {
let result = store::update_ref(sql, &cmd.ref_name, &cmd.old_hash, &cmd.new_hash)
.map_err(|e| format!("{}", e));
results.push((cmd.ref_name.clone(), result));
}
// --- Set default branch + rebuild FTS index ---
for (ref_name, result) in &results {
if result.is_ok() && ref_name.starts_with("refs/heads/") {
if store::get_config(sql, "default_branch")?.is_none() {
let _ = store::set_config(sql, "default_branch", ref_name);
}
}
}
let mut rebuilt_default_branch_fts = false;
if !bulk_mode {
if let Some(default_ref) = store::get_config(sql, "default_branch")? {
for cmd in &request.commands {
if cmd.ref_name == default_ref {
if let Some((_, Ok(()))) = results.iter().find(|(r, _)| r == &cmd.ref_name)
{
let _ = store::rebuild_fts_index(sql, &cmd.new_hash);
rebuilt_default_branch_fts = true;
}
}
}
}
}
// --- 9. Return report-status ---
let status_body = build_report_status(&results);
let ok_count = results.iter().filter(|(_, result)| result.is_ok()).count();
let progress = if send_progress {
receive_pack_progress_messages(
&request,
pack_data.len(),
ok_count,
results.len().saturating_sub(ok_count),
rebuilt_default_branch_fts,
)
} else {
Vec::new()
};
protocol_result_response(
"git-receive-pack",
maybe_sideband_wrap_with_progress(status_body, &progress, sideband_mode),
)
})();
result.or_else(|err| {
protocol_fatal_response(
"git-receive-pack",
&protocol_error_message(&err),
sideband_mode,
)
})
}
/// Process pack data using the streaming two-pass approach.
///
/// Pass 1: `build_index` walks the pack byte stream, recording metadata for
/// each entry (offsets, type, delta base references). Zlib data is decompressed
/// to a sink — no object data is held in memory.
///
/// Pass 2: entries are processed by type. Each is decompressed on-demand from
/// the pack bytes (which stay in memory as the request body), delta chains are
/// resolved iteratively, and the result is stored in permanent tables then
/// dropped. Only one resolved object exists in memory at a time.
fn process_pack_streaming(sql: &SqlStorage, pack_data: &[u8], bulk_mode: bool) -> Result<()> {
// --- Build lightweight index ---
let (index, offset_to_idx) = pack::build_index(pack_data).map_err(|e| Error::RustError(e.0))?;
// --- Pre-compute types by following OFS_DELTA chains ---
// Returns Some(type) for entries resolvable via OFS_DELTA, None for REF_DELTA.
let types: Vec<Option<pack::ObjectType>> = (0..index.len())
.map(|i| pack::resolve_type(&index, &offset_to_idx, i))
.collect();
let mut hash_to_idx: HashMap<String, usize> = HashMap::new();
// Resolve cache: avoids re-decompressing shared delta chain bases.
// 1024 entries ≈ 20-30 MB, well within DO's 128 MB memory limit.
let mut cache = pack::ResolveCache::new(1024, pack::CACHE_BUDGET_BYTES);
// --- Load external bases for thin pack resolution ---
// Thin packs use REF_DELTAs referencing objects from previous pushes.
// Collect base hashes not in this pack and load from raw_objects
// (commits/trees) or reconstruct from the blobs table.
let mut external: pack::ExternalObjects = HashMap::new();
for entry in &index {
if let Some(ref base_hash) = entry.base_hash {
if !external.contains_key(base_hash.as_str()) {
// Try raw_objects first (commits and trees)
if let Ok(Some(raw)) = store::load_raw_object_pub(sql, base_hash) {
let obj_type = store::detect_object_type(sql, base_hash);
external.insert(base_hash.clone(), (obj_type, raw.into()));
}
// Try blobs table (reconstructed from delta chain)
else if let Ok(Some(blob_data)) = store::reconstruct_blob_by_hash(sql, base_hash)
{
external.insert(
base_hash.clone(),
(pack::ObjectType::Blob, blob_data.into()),
);
}
}
}
}
// --- Process commits ---
let mut root_tree_hashes: Vec<String> = Vec::new();
for i in 0..index.len() {
if types[i] != Some(pack::ObjectType::Commit) {
continue;
}
let (_, data) = pack::resolve_entry(
pack_data,
&index,
&offset_to_idx,
i,
&hash_to_idx,
&mut pack::ResolveCtx {
cache: &mut cache,
external: &external,
},
)
.map_err(|e| Error::RustError(e.0))?;
let hash = pack::hash_object(&pack::ObjectType::Commit, &*data);
hash_to_idx.insert(hash.clone(), i);
let parsed = store::parse_commit(&*data)
.map_err(|e| Error::RustError(format!("commit {}: {}", hash, e)))?;
root_tree_hashes.push(parsed.tree_hash.clone());
store::store_commit(sql, &hash, &parsed, &*data, bulk_mode)?;
}
// --- Process trees ---
for i in 0..index.len() {
if types[i] != Some(pack::ObjectType::Tree) {
continue;
}
let (_, data) = pack::resolve_entry(
pack_data,
&index,
&offset_to_idx,
i,
&hash_to_idx,
&mut pack::ResolveCtx {
cache: &mut cache,
external: &external,
},
)
.map_err(|e| Error::RustError(e.0))?;
let hash = pack::hash_object(&pack::ObjectType::Tree, &*data);
hash_to_idx.insert(hash.clone(), i);
store::store_tree(sql, &hash, &*data)?;
}
// --- Resolve blob paths (all trees now in permanent storage) ---
let empty_pack_trees: HashMap<String, Vec<store::TreeEntry>> = HashMap::new();
let blob_paths = store::resolve_blob_paths(sql, &empty_pack_trees, &root_tree_hashes)?;
// Free memory: commit/tree entries in the resolve cache are no longer needed.
// This reclaims ~20-30 MB before blob processing, which needs headroom for
// keyframe decompression and xpatch delta computation.
cache.clear();
// --- Process blobs ---
for i in 0..index.len() {
if types[i] != Some(pack::ObjectType::Blob) {
continue;
}
let (_, data) = pack::resolve_entry(
pack_data,
&index,
&offset_to_idx,
i,
&hash_to_idx,
&mut pack::ResolveCtx {
cache: &mut cache,
external: &external,
},
)
.map_err(|e| Error::RustError(e.0))?;
let hash = pack::hash_object(&pack::ObjectType::Blob, &*data);
hash_to_idx.insert(hash.clone(), i);
let path = blob_paths
.get(&hash)
.map(|s| s.as_str())
.unwrap_or("unknown");
store::store_blob(sql, &hash, &*data, path, KEYFRAME_INTERVAL)?;
}
// --- Handle REF_DELTA entries with unknown types ---
for i in 0..index.len() {
if types[i].is_some() {
continue;
}
let resolved = pack::resolve_entry(
pack_data,
&index,
&offset_to_idx,
i,
&hash_to_idx,
&mut pack::ResolveCtx {
cache: &mut cache,
external: &external,
},
);
match resolved {
Ok((obj_type, data)) => {
let hash = pack::hash_object(&obj_type, &*data);
hash_to_idx.insert(hash.clone(), i);
match obj_type {
pack::ObjectType::Commit => {
let parsed = store::parse_commit(&*data)
.map_err(|e| Error::RustError(format!("commit {}: {}", hash, e)))?;
store::store_commit(sql, &hash, &parsed, &*data, bulk_mode)?;
}
pack::ObjectType::Tree => {
store::store_tree(sql, &hash, &*data)?;
}
pack::ObjectType::Blob => {
let path = blob_paths
.get(&hash)
.map(|s| s.as_str())
.unwrap_or("unknown");
store::store_blob(sql, &hash, &*data, path, KEYFRAME_INTERVAL)?;
}
pack::ObjectType::Tag => {}
}
}
Err(_) => {}
}
}
Ok(())
}
// ---------------------------------------------------------------------------
// git-upload-pack (handles `git clone` / `git fetch`)
// ---------------------------------------------------------------------------
/// Process a git-upload-pack POST request.
///
/// 1. Parse want/have lines from the client
/// 2. Walk the commit graph to collect all needed objects
/// 3. Reconstruct blob content from xpatch delta chains
/// 4. Generate and return a pack file
pub fn handle_upload_pack(sql: &SqlStorage, body: &[u8]) -> Result<Response> {
// --- 1. Parse want/have negotiation ---
let request = parse_upload_request(body);
let preferred_sideband_mode = preferred_sideband_mode(&request.capabilities);
let sideband_mode = match requested_sideband_mode(&request.capabilities) {
Ok(mode) => mode,
Err(e) => {
return protocol_fatal_response(
"git-upload-pack",
&format!("upload-pack capabilities: {}", e),
preferred_sideband_mode,
)
}
};
let send_progress = should_send_upload_progress(&request, sideband_mode);
let result: Result<Response> = (|| {
if request.wants.is_empty() {
return Err(Error::RustError("no want lines received".into()));
}
let have_set: HashSet<String> = request.haves.iter().cloned().collect();
let common_haves = find_common_haves(sql, &request.wants, &request.haves)?;
let can_send_pack_without_done = !request.done
&& request.capabilities.contains("multi_ack_detailed")
&& request.capabilities.contains("no-done")
&& !common_haves.is_empty();
if !request.done && !request.haves.is_empty() && !can_send_pack_without_done {
return protocol_result_response(
"git-upload-pack",
build_negotiation_response(&request, &common_haves),
);
}
// --- 2-3. Collect all needed objects (commits, trees, blobs) ---
let objects = store::collect_objects(sql, &request.wants, &have_set)?;
// Build response: pkt-line negotiation prefix, then pack data.
let mut resp_body = build_pack_response_prefix(&request, &common_haves);
match sideband_mode {
Some(mode) => {
if send_progress {
for message in upload_pack_progress_messages(objects.len()) {
append_sideband_data(&mut resp_body, 2, message.as_bytes(), mode);
}
}
pack::generate_into(&objects, |chunk| {
append_sideband_data(&mut resp_body, 1, chunk, mode)
});
resp_body.extend_from_slice(b"0000");
}
None => pack::generate_into(&objects, |chunk| resp_body.extend_from_slice(chunk)),
}
protocol_result_response("git-upload-pack", resp_body)
})();
result.or_else(|err| {
protocol_fatal_response(
"git-upload-pack",
&protocol_error_message(&err),
sideband_mode,
)
})
}
#[derive(Debug, Default)]
struct UploadRequest {
wants: Vec<String>,
haves: Vec<String>,
capabilities: HashSet<String>,
done: bool,
}
/// Parse want/have lines from a git-upload-pack request body.
///
/// Format (pkt-line encoded):
/// want <hash>[ capabilities]\n
/// ...
/// [have <hash>\n]
/// ...
/// done\n
fn parse_upload_request(data: &[u8]) -> UploadRequest {
let mut request = UploadRequest::default();
let mut pos = 0;
let mut saw_first_want = false;
loop {
match read_pkt_line(data, pos) {
Some((None, new_pos)) => {
// Flush packet — may separate wants from haves
pos = new_pos;
}
Some((Some(line), new_pos)) => {
pos = new_pos;
let text = match std::str::from_utf8(line) {
Ok(t) => t.trim_end_matches('\n'),
Err(_) => continue,
};
if text == "done" {
request.done = true;
break;
} else if let Some(rest) = text.strip_prefix("want ") {
// First want line may have capabilities after a space
let mut fields = rest.split_whitespace();
let hash = fields.next().unwrap_or("");
if hash.len() == 40 {
request.wants.push(hash.to_string());
}
if !saw_first_want {
saw_first_want = true;
for capability in fields {
request.capabilities.insert(capability.to_string());
}
}
} else if let Some(rest) = text.strip_prefix("have ") {
let hash = rest.split_whitespace().next().unwrap_or("");
if hash.len() == 40 {
request.haves.push(hash.to_string());
}
}
}
None => break,
}
}
request
}
fn find_common_haves(sql: &SqlStorage, wants: &[String], haves: &[String]) -> Result<Vec<String>> {
if wants.is_empty() || haves.is_empty() {
return Ok(Vec::new());
}
let want_haves: HashSet<String> = haves.iter().cloned().collect();
let mut found: HashSet<String> = HashSet::new();
let mut visited: HashSet<String> = HashSet::new();
let mut queue: VecDeque<String> = wants.iter().cloned().collect();
#[derive(serde::Deserialize)]
struct ParentRow {
parent_hash: String,
}
while let Some(commit_hash) = queue.pop_front() {
if !visited.insert(commit_hash.clone()) {
continue;
}
if want_haves.contains(&commit_hash) {
found.insert(commit_hash.clone());
}
let parents: Vec<ParentRow> = sql
.exec(
"SELECT parent_hash FROM commit_parents
WHERE commit_hash = ? ORDER BY ordinal ASC",
vec![SqlStorageValue::from(commit_hash)],
)?
.to_array()?;
for parent in parents {
if !visited.contains(&parent.parent_hash) {
queue.push_back(parent.parent_hash);
}
}
}
Ok(haves
.iter()
.filter(|have| found.contains(*have))
.cloned()
.collect())
}
fn build_negotiation_response(request: &UploadRequest, common_haves: &[String]) -> Vec<u8> {
let mut body = Vec::new();
match common_haves.last().map(|s| s.as_str()) {
Some(_) if request.capabilities.contains("multi_ack_detailed") => {
for common in common_haves {
pkt_line_bytes(&mut body, format!("ACK {} common\n", common).as_bytes());
}
pkt_line_bytes(&mut body, b"NAK\n");
}
Some(common) if request.capabilities.contains("multi_ack") => {
pkt_line_bytes(&mut body, format!("ACK {} continue\n", common).as_bytes());
pkt_line_bytes(&mut body, b"NAK\n");
}
Some(common) => {
pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes());
}
None => pkt_line_bytes(&mut body, b"NAK\n"),
}
body
}
fn build_pack_response_prefix(request: &UploadRequest, common_haves: &[String]) -> Vec<u8> {
let mut body = Vec::new();
if !request.done
&& request.capabilities.contains("multi_ack_detailed")
&& request.capabilities.contains("no-done")
{
if let Some(common) = common_haves.last().map(|s| s.as_str()) {
for common_have in common_haves {
pkt_line_bytes(
&mut body,
format!("ACK {} common\n", common_have).as_bytes(),
);
}
pkt_line_bytes(&mut body, format!("ACK {} ready\n", common).as_bytes());
pkt_line_bytes(&mut body, b"NAK\n");
pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes());
return body;
}
}
if request.done {
if request.capabilities.contains("multi_ack_detailed")
|| request.capabilities.contains("multi_ack")
{
if let Some(common) = common_haves.last().map(|s| s.as_str()) {
pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes());
return body;
}
}
if common_haves.is_empty() {
pkt_line_bytes(&mut body, b"NAK\n");
}
return body;
}
pkt_line_bytes(&mut body, b"NAK\n");
body
}
// ---------------------------------------------------------------------------
// Pkt-line parsing for ref commands
// ---------------------------------------------------------------------------
struct RefCommand {
old_hash: String,
new_hash: String,
ref_name: String,
}
#[derive(Default)]
struct ReceivePackRequest {
commands: Vec<RefCommand>,
pack_offset: usize,
capabilities: HashSet<String>,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum SidebandMode {
Small,
Large,
}
impl SidebandMode {
fn max_data_len(self) -> usize {
match self {
// 4-byte pkt-line length + 1 sideband code + 995 bytes data = 1000 total.
Self::Small => 995,
// 4-byte pkt-line length + 1 sideband code + 65515 bytes data = 65520 total.
Self::Large => 65_515,
}
}
}
/// Parse pkt-line encoded ref update commands from the start of the body.
/// Returns the commands and the byte offset where the pack data begins.
fn parse_receive_pack_request(data: &[u8]) -> ReceivePackRequest {
let mut request = ReceivePackRequest::default();
let mut pos = 0;
let mut first_command = true;
loop {
match read_pkt_line(data, pos) {
Some((None, new_pos)) => {
// Flush packet: end of commands
pos = new_pos;
break;
}
Some((Some(line), new_pos)) => {
pos = new_pos;
if let Some((cmd, capabilities)) = parse_single_command(line) {
if first_command {
request.capabilities = capabilities;
first_command = false;
}
request.commands.push(cmd);
}
}
None => break, // end of data
}
}
request.pack_offset = pos;
request
}
/// Read one pkt-line from data at the given position.
/// Returns Some((None, new_pos)) for flush, Some((Some(payload), new_pos))
/// for data, or None if at end of input.
fn read_pkt_line(data: &[u8], pos: usize) -> Option<(Option<&[u8]>, usize)> {
if pos + 4 > data.len() {
return None;
}
let hex = std::str::from_utf8(&data[pos..pos + 4]).ok()?;
let len = usize::from_str_radix(hex, 16).ok()?;
if len == 0 {
// Flush packet
return Some((None, pos + 4));
}
if len < 4 || pos + len > data.len() {
return None; // malformed
}
let payload = &data[pos + 4..pos + len];
Some((Some(payload), pos + len))
}
/// Parse a single command line: "<old-hex> <new-hex> <refname>[\0capabilities]\n"
fn parse_single_command(line: &[u8]) -> Option<(RefCommand, HashSet<String>)> {
// Strip trailing newline
let line = if line.last() == Some(&b'\n') {
&line[..line.len() - 1]
} else {
line
};
let (line, capabilities) = match line.iter().position(|&b| b == 0) {
Some(pos) => (&line[..pos], parse_capabilities(&line[pos + 1..])),
None => (line, HashSet::new()),
};
let text = std::str::from_utf8(line).ok()?;
let parts: Vec<&str> = text.splitn(3, ' ').collect();
if parts.len() != 3 {
return None;
}
Some((
RefCommand {
old_hash: parts[0].to_string(),
new_hash: parts[1].to_string(),
ref_name: parts[2].to_string(),
},
capabilities,
))
}
fn parse_capabilities(raw: &[u8]) -> HashSet<String> {
std::str::from_utf8(raw)
.ok()
.map(|text| text.split_whitespace().map(str::to_string).collect())
.unwrap_or_default()
}
fn preferred_sideband_mode(capabilities: &HashSet<String>) -> Option<SidebandMode> {
if capabilities.contains("side-band-64k") {
Some(SidebandMode::Large)
} else if capabilities.contains("side-band") {
Some(SidebandMode::Small)
} else {
None
}
}
fn requested_sideband_mode(
capabilities: &HashSet<String>,
) -> std::result::Result<Option<SidebandMode>, String> {
let wants_small = capabilities.contains("side-band");
let wants_large = capabilities.contains("side-band-64k");
if wants_small && wants_large {
return Err("client requested both side-band and side-band-64k".into());
}
Ok(preferred_sideband_mode(capabilities))
}
fn should_send_upload_progress(
request: &UploadRequest,
sideband_mode: Option<SidebandMode>,
) -> bool {
sideband_mode.is_some() && !request.capabilities.contains("no-progress")
}
fn should_send_receive_progress(
request: &ReceivePackRequest,
sideband_mode: Option<SidebandMode>,
) -> bool {
sideband_mode.is_some() && !request.capabilities.contains("quiet")
}
fn upload_pack_progress_messages(object_count: usize) -> Vec<String> {
vec![
format!("Enumerating objects: {}, done.\n", object_count),
format!(
"Counting objects: 100% ({}/{}), done.\n",
object_count, object_count
),
format!(
"Compressing objects: 100% ({}/{}), done.\n",
object_count, object_count
),
format!(
"Total {} (delta 0), reused 0 (delta 0), pack-reused 0\n",
object_count
),
]
}
fn receive_pack_progress_messages(
request: &ReceivePackRequest,
pack_bytes: usize,
ok_count: usize,
failed_count: usize,
rebuilt_default_branch_fts: bool,
) -> Vec<String> {
let mut messages = vec![
format!("Processing {} ref update(s).\n", request.commands.len()),
format!("Received pack: {} bytes.\n", pack_bytes),
format!(
"Updated refs: {} succeeded, {} failed.\n",
ok_count, failed_count
),
];
if rebuilt_default_branch_fts {
messages.push("Rebuilt search index for the default branch.\n".to_string());
}
messages
}
// ---------------------------------------------------------------------------
// Report status
// ---------------------------------------------------------------------------
/// Build a report-status response in pkt-line format.
fn build_report_status(results: &[(String, std::result::Result<(), String>)]) -> Vec<u8> {
build_report_status_with_unpack_result("ok", results)
}
fn build_unpack_error_status(commands: &[RefCommand], reason: &str) -> Vec<u8> {
let results: Vec<(String, std::result::Result<(), String>)> = commands
.iter()
.map(|cmd| (cmd.ref_name.clone(), Err(reason.to_string())))
.collect();
build_report_status_with_unpack_result(reason, &results)
}
fn build_report_status_with_unpack_result(
unpack_result: &str,
results: &[(String, std::result::Result<(), String>)],
) -> Vec<u8> {
let mut buf = Vec::new();
pkt_line_bytes(&mut buf, format!("unpack {}\n", unpack_result).as_bytes());
for (ref_name, result) in results {
match result {
Ok(()) => {
let line = format!("ok {}\n", ref_name);
pkt_line_bytes(&mut buf, line.as_bytes());
}
Err(reason) => {
let line = format!("ng {} {}\n", ref_name, reason);
pkt_line_bytes(&mut buf, line.as_bytes());
}
}
}
buf.extend_from_slice(b"0000"); // flush
buf
}
fn maybe_sideband_wrap_with_progress(
body: Vec<u8>,
progress_messages: &[String],
sideband_mode: Option<SidebandMode>,
) -> Vec<u8> {
match sideband_mode {
Some(mode) => {
let mut wrapped = Vec::new();
for message in progress_messages {
append_sideband_data(&mut wrapped, 2, message.as_bytes(), mode);
}
append_sideband_data(&mut wrapped, 1, &body, mode);
wrapped.extend_from_slice(b"0000");
wrapped
}
None => body,
}
}
fn protocol_result_response(service: &str, body: Vec<u8>) -> Result<Response> {
let mut resp = Response::from_bytes(body)?;
resp.headers_mut()
.set("Content-Type", &format!("application/x-{}-result", service))?;
Ok(resp)
}
fn protocol_fatal_response(
service: &str,
message: &str,
sideband_mode: Option<SidebandMode>,
) -> Result<Response> {
protocol_result_response(service, protocol_fatal_body(message, sideband_mode))
}
fn protocol_fatal_body(message: &str, sideband_mode: Option<SidebandMode>) -> Vec<u8> {
match sideband_mode {
Some(mode) => {
let mut body = Vec::new();
append_sideband_data(
&mut body,
3,
format!("fatal: {}\n", message).as_bytes(),
mode,
);
body.extend_from_slice(b"0000");
body
}
None => {
let mut body = Vec::new();
pkt_line_bytes(&mut body, format!("ERR {}\n", message).as_bytes());
body.extend_from_slice(b"0000");
body
}
}
}
fn protocol_error_message(err: &Error) -> String {
match err {
Error::RustError(message) => message.clone(),
_ => err.to_string(),
}
}
fn append_sideband_data(buf: &mut Vec<u8>, channel: u8, data: &[u8], mode: SidebandMode) {
for chunk in data.chunks(mode.max_data_len()) {
let mut payload = Vec::with_capacity(1 + chunk.len());
payload.push(channel);
payload.extend_from_slice(chunk);
pkt_line_bytes(buf, &payload);
}
}
fn pkt_line_bytes(buf: &mut Vec<u8>, data: &[u8]) {
let len = 4 + data.len();
buf.extend_from_slice(format!("{:04x}", len).as_bytes());
buf.extend_from_slice(data);
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_receive_pack_request_reads_capabilities_and_pack_offset() {
let mut body = Vec::new();
pkt_line_bytes(
&mut body,
b"0000000000000000000000000000000000000000 0123456789012345678901234567890123456789 refs/heads/main\0report-status side-band-64k ofs-delta\n",
);
body.extend_from_slice(b"0000PACK");
let request = parse_receive_pack_request(&body);
assert_eq!(request.commands.len(), 1);
assert_eq!(request.commands[0].ref_name, "refs/heads/main");
assert_eq!(request.pack_offset, body.len() - 4);
assert!(request.capabilities.contains("report-status"));
assert!(request.capabilities.contains("side-band-64k"));
assert!(request.capabilities.contains("ofs-delta"));
}
#[test]
fn requested_sideband_mode_rejects_conflicting_requests() {
let capabilities = HashSet::from(["side-band".to_string(), "side-band-64k".to_string()]);
let err = requested_sideband_mode(&capabilities).expect_err("conflicting sideband");
assert_eq!(err, "client requested both side-band and side-band-64k");
}
#[test]
fn maybe_sideband_wrap_encodes_report_status_on_channel_one() {
let body = build_report_status(&[("refs/heads/main".to_string(), Ok(()))]);
let wrapped =
maybe_sideband_wrap_with_progress(body.clone(), &[], Some(SidebandMode::Large));
let mut expected = Vec::new();
let mut payload = Vec::new();
payload.push(1);
payload.extend_from_slice(&body);
pkt_line_bytes(&mut expected, &payload);
expected.extend_from_slice(b"0000");
assert_eq!(wrapped, expected);
}
#[test]
fn maybe_sideband_wrap_with_progress_places_channel_two_before_status() {
let body = build_report_status(&[("refs/heads/main".to_string(), Ok(()))]);
let progress = vec!["Processing 1 ref update(s).\n".to_string()];
let wrapped =
maybe_sideband_wrap_with_progress(body.clone(), &progress, Some(SidebandMode::Large));
let mut expected = Vec::new();
let mut progress_payload = Vec::new();
progress_payload.push(2);
progress_payload.extend_from_slice(progress[0].as_bytes());
pkt_line_bytes(&mut expected, &progress_payload);
let mut status_payload = Vec::new();
status_payload.push(1);
status_payload.extend_from_slice(&body);
pkt_line_bytes(&mut expected, &status_payload);
expected.extend_from_slice(b"0000");
assert_eq!(wrapped, expected);
}
#[test]
fn upload_pack_progress_can_be_suppressed_by_no_progress() {
let request = UploadRequest {
capabilities: HashSet::from(["no-progress".to_string()]),
..UploadRequest::default()
};
assert!(!should_send_upload_progress(
&request,
Some(SidebandMode::Large)
));
assert!(should_send_upload_progress(
&UploadRequest::default(),
Some(SidebandMode::Large)
));
assert!(!should_send_upload_progress(
&UploadRequest::default(),
None
));
}
#[test]
fn receive_pack_progress_can_be_suppressed_by_quiet() {
let request = ReceivePackRequest {
capabilities: HashSet::from(["quiet".to_string()]),
..ReceivePackRequest::default()
};
assert!(!should_send_receive_progress(
&request,
Some(SidebandMode::Large)
));
assert!(should_send_receive_progress(
&ReceivePackRequest::default(),
Some(SidebandMode::Large)
));
assert!(!should_send_receive_progress(
&ReceivePackRequest::default(),
None
));
}
#[test]
fn upload_pack_progress_messages_include_expected_stages() {
let messages = upload_pack_progress_messages(7);
assert_eq!(
messages,
vec![
"Enumerating objects: 7, done.\n",
"Counting objects: 100% (7/7), done.\n",
"Compressing objects: 100% (7/7), done.\n",
"Total 7 (delta 0), reused 0 (delta 0), pack-reused 0\n",
]
);
}
#[test]
fn receive_pack_progress_messages_include_summary_and_rebuild_notice() {
let request = ReceivePackRequest {
commands: vec![RefCommand {
old_hash: store::ZERO_HASH.to_string(),
new_hash: "0123456789012345678901234567890123456789".to_string(),
ref_name: "refs/heads/main".to_string(),
}],
..ReceivePackRequest::default()
};
let messages = receive_pack_progress_messages(&request, 1234, 1, 0, true);
assert_eq!(
messages,
vec![
"Processing 1 ref update(s).\n",
"Received pack: 1234 bytes.\n",
"Updated refs: 1 succeeded, 0 failed.\n",
"Rebuilt search index for the default branch.\n",
]
);
}
#[test]
fn append_sideband_data_splits_payloads_at_mode_boundary() {
let data = vec![b'x'; 1_100];
let mut wrapped = Vec::new();
append_sideband_data(&mut wrapped, 1, &data, SidebandMode::Small);
let mut expected = Vec::new();
let mut first = vec![1];
first.extend(vec![b'x'; 995]);
pkt_line_bytes(&mut expected, &first);
let mut second = vec![1];
second.extend(vec![b'x'; 105]);
pkt_line_bytes(&mut expected, &second);
assert_eq!(wrapped, expected);
}
#[test]
fn build_unpack_error_status_uses_unpack_error_without_ng_prefix() {
let commands = vec![RefCommand {
old_hash: store::ZERO_HASH.to_string(),
new_hash: "0123456789012345678901234567890123456789".to_string(),
ref_name: "refs/heads/main".to_string(),
}];
let status = build_unpack_error_status(&commands, "pack too large");
let mut expected = Vec::new();
pkt_line_bytes(&mut expected, b"unpack pack too large\n");
pkt_line_bytes(&mut expected, b"ng refs/heads/main pack too large\n");
expected.extend_from_slice(b"0000");
assert_eq!(status, expected);
}
#[test]
fn protocol_fatal_body_uses_channel_three_when_sideband_is_active() {
let body = protocol_fatal_body("boom", Some(SidebandMode::Large));
let mut expected = Vec::new();
let mut payload = vec![3];
payload.extend_from_slice(b"fatal: boom\n");
pkt_line_bytes(&mut expected, &payload);
expected.extend_from_slice(b"0000");
assert_eq!(body, expected);
}
#[test]
fn protocol_fatal_body_uses_err_pkt_line_without_sideband() {
let body = protocol_fatal_body("boom", None);
let mut expected = Vec::new();
pkt_line_bytes(&mut expected, b"ERR boom\n");
expected.extend_from_slice(b"0000");
assert_eq!(body, expected);
}
#[test]
fn parse_upload_request_reads_caps_haves_and_done() {
let mut body = Vec::new();
pkt_line_bytes(
&mut body,
b"want 0123456789012345678901234567890123456789 multi_ack_detailed no-done ofs-delta\n",
);
body.extend_from_slice(b"0000");
pkt_line_bytes(
&mut body,
b"have abcdefabcdefabcdefabcdefabcdefabcdefabcd\n",
);
pkt_line_bytes(&mut body, b"done\n");
let request = parse_upload_request(&body);
assert_eq!(
request.wants,
vec!["0123456789012345678901234567890123456789"]
);
assert_eq!(
request.haves,
vec!["abcdefabcdefabcdefabcdefabcdefabcdefabcd"]
);
assert!(request.done);
assert!(request.capabilities.contains("multi_ack_detailed"));
assert!(request.capabilities.contains("no-done"));
assert!(request.capabilities.contains("ofs-delta"));
}
#[test]
fn pack_response_prefix_uses_ack_ready_for_no_done_fetches() {
let mut capabilities = HashSet::new();
capabilities.insert("multi_ack_detailed".to_string());
capabilities.insert("no-done".to_string());
let request = UploadRequest {
capabilities,
..UploadRequest::default()
};
let common_haves = vec!["0123456789012345678901234567890123456789".to_string()];
let prefix = build_pack_response_prefix(&request, &common_haves);
let mut expected = Vec::new();
pkt_line_bytes(
&mut expected,
b"ACK 0123456789012345678901234567890123456789 common\n",
);
pkt_line_bytes(
&mut expected,
b"ACK 0123456789012345678901234567890123456789 ready\n",
);
pkt_line_bytes(&mut expected, b"NAK\n");
pkt_line_bytes(
&mut expected,
b"ACK 0123456789012345678901234567890123456789\n",
);
assert_eq!(prefix, expected);
}
#[test]
fn negotiation_response_acks_common_commit_before_done() {
let mut capabilities = HashSet::new();
capabilities.insert("multi_ack_detailed".to_string());
let request = UploadRequest {
capabilities,
..UploadRequest::default()
};
let response = build_negotiation_response(
&request,
&["0123456789012345678901234567890123456789".to_string()],
);
let mut expected = Vec::new();
pkt_line_bytes(
&mut expected,
b"ACK 0123456789012345678901234567890123456789 common\n",
);
pkt_line_bytes(&mut expected, b"NAK\n");
assert_eq!(response, expected);
}
}