//! Git smart HTTP protocol handlers for receive-pack and upload-pack. use crate::{pack, store, KEYFRAME_INTERVAL}; use std::collections::{HashMap, HashSet, VecDeque}; use worker::*; // --------------------------------------------------------------------------- // git-receive-pack (handles `git push`) // --------------------------------------------------------------------------- /// Process a git-receive-pack POST request. /// /// Uses a streaming approach: builds a lightweight index of the pack entries, /// then processes objects by type (commits → trees → blobs), decompressing /// each entry on-demand from the pack bytes. Only one resolved object is held /// in memory at a time, keeping peak memory to pack_size + O(1 object). /// /// 1. Parse pkt-line ref update commands /// 2. Build pack index (decompress-to-sink, no data held) /// 3. Pre-compute types by following OFS_DELTA chains /// 4. Process commits (decompress, store, drop) /// 5. Process trees (decompress, store, drop) /// 6. Resolve blob paths (all trees now in DB) /// 7. Process blobs (decompress, store with xpatch compression, drop) /// 8. Update refs /// 9. Return report-status pub fn handle_receive_pack(sql: &SqlStorage, body: &[u8]) -> Result { // --- 1. Parse ref update commands from pkt-lines --- let request = parse_receive_pack_request(body); let preferred_sideband_mode = preferred_sideband_mode(&request.capabilities); let sideband_mode = match requested_sideband_mode(&request.capabilities) { Ok(mode) => mode, Err(e) => { return protocol_fatal_response( "git-receive-pack", &format!("receive-pack capabilities: {}", e), preferred_sideband_mode, ) } }; let send_progress = should_send_receive_progress(&request, sideband_mode); // Git splits large pushes (>http.postBuffer) into two POSTs: // 1st: a 4-byte flush "0000" (no commands, no pack) // 2nd: the full payload (commands + flush + pack) // Return 200 for the probe so git proceeds with the real request. if request.commands.is_empty() { let mut resp = Response::from_bytes(Vec::new())?; resp.headers_mut() .set("Content-Type", "application/x-git-receive-pack-result")?; return Ok(resp); } // --- Check bulk mode (skip_fts=1 skips commit graph + FTS indexing) --- let result: Result = (|| { let bulk_mode = store::get_config(sql, "skip_fts")? .map(|v| v == "1") .unwrap_or(false); // --- 2-7. Process pack data (streaming) --- // Note: Cloudflare DO SQLite does not support BEGIN/COMMIT via sql.exec(). // transactionSync() is not available in workers-rs 0.7.5. // Each sql.exec() auto-commits individually. If the DO times out mid-push, // partial state may result. Use the admin/set-ref endpoint to recover. // TODO: look into getting transaction support in workers-rs let pack_data = &body[request.pack_offset..]; // Reject oversized packs before any object is parsed. A clean ng response // is far better than an OOM panic mid-push. The push script already splits // at 30 MB; this is a server-side safety net. if pack_data.len() > pack::MAX_PACK_BYTES { let reason = format!( "pack too large ({} MB, limit {} MB)", pack_data.len() / 1_000_000, pack::MAX_PACK_BYTES / 1_000_000, ); let status_body = build_unpack_error_status(&request.commands, &reason); let progress = if send_progress { receive_pack_progress_messages( &request, pack_data.len(), 0, request.commands.len(), false, ) } else { Vec::new() }; return protocol_result_response( "git-receive-pack", maybe_sideband_wrap_with_progress(status_body, &progress, sideband_mode), ); } if pack_data.len() > 4 && &pack_data[..4] == b"PACK" { process_pack_streaming(sql, pack_data, bulk_mode)?; } // --- 8. Update refs --- let mut results: Vec<(String, std::result::Result<(), String>)> = Vec::new(); for cmd in &request.commands { let result = store::update_ref(sql, &cmd.ref_name, &cmd.old_hash, &cmd.new_hash) .map_err(|e| format!("{}", e)); results.push((cmd.ref_name.clone(), result)); } // --- Set default branch + rebuild FTS index --- for (ref_name, result) in &results { if result.is_ok() && ref_name.starts_with("refs/heads/") { if store::get_config(sql, "default_branch")?.is_none() { let _ = store::set_config(sql, "default_branch", ref_name); } } } let mut rebuilt_default_branch_fts = false; if !bulk_mode { if let Some(default_ref) = store::get_config(sql, "default_branch")? { for cmd in &request.commands { if cmd.ref_name == default_ref { if let Some((_, Ok(()))) = results.iter().find(|(r, _)| r == &cmd.ref_name) { let _ = store::rebuild_fts_index(sql, &cmd.new_hash); rebuilt_default_branch_fts = true; } } } } } // --- 9. Return report-status --- let status_body = build_report_status(&results); let ok_count = results.iter().filter(|(_, result)| result.is_ok()).count(); let progress = if send_progress { receive_pack_progress_messages( &request, pack_data.len(), ok_count, results.len().saturating_sub(ok_count), rebuilt_default_branch_fts, ) } else { Vec::new() }; protocol_result_response( "git-receive-pack", maybe_sideband_wrap_with_progress(status_body, &progress, sideband_mode), ) })(); result.or_else(|err| { protocol_fatal_response( "git-receive-pack", &protocol_error_message(&err), sideband_mode, ) }) } /// Process pack data using the streaming two-pass approach. /// /// Pass 1: `build_index` walks the pack byte stream, recording metadata for /// each entry (offsets, type, delta base references). Zlib data is decompressed /// to a sink — no object data is held in memory. /// /// Pass 2: entries are processed by type. Each is decompressed on-demand from /// the pack bytes (which stay in memory as the request body), delta chains are /// resolved iteratively, and the result is stored in permanent tables then /// dropped. Only one resolved object exists in memory at a time. fn process_pack_streaming(sql: &SqlStorage, pack_data: &[u8], bulk_mode: bool) -> Result<()> { // --- Build lightweight index --- let (index, offset_to_idx) = pack::build_index(pack_data).map_err(|e| Error::RustError(e.0))?; // --- Pre-compute types by following OFS_DELTA chains --- // Returns Some(type) for entries resolvable via OFS_DELTA, None for REF_DELTA. let types: Vec> = (0..index.len()) .map(|i| pack::resolve_type(&index, &offset_to_idx, i)) .collect(); let mut hash_to_idx: HashMap = HashMap::new(); // Resolve cache: avoids re-decompressing shared delta chain bases. // 1024 entries ≈ 20-30 MB, well within DO's 128 MB memory limit. let mut cache = pack::ResolveCache::new(1024, pack::CACHE_BUDGET_BYTES); // --- Load external bases for thin pack resolution --- // Thin packs use REF_DELTAs referencing objects from previous pushes. // Collect base hashes not in this pack and load from raw_objects // (commits/trees) or reconstruct from the blobs table. let mut external: pack::ExternalObjects = HashMap::new(); for entry in &index { if let Some(ref base_hash) = entry.base_hash { if !external.contains_key(base_hash.as_str()) { // Try raw_objects first (commits and trees) if let Ok(Some(raw)) = store::load_raw_object_pub(sql, base_hash) { let obj_type = store::detect_object_type(sql, base_hash); external.insert(base_hash.clone(), (obj_type, raw.into())); } // Try blobs table (reconstructed from delta chain) else if let Ok(Some(blob_data)) = store::reconstruct_blob_by_hash(sql, base_hash) { external.insert( base_hash.clone(), (pack::ObjectType::Blob, blob_data.into()), ); } } } } // --- Process commits --- let mut root_tree_hashes: Vec = Vec::new(); for i in 0..index.len() { if types[i] != Some(pack::ObjectType::Commit) { continue; } let (_, data) = pack::resolve_entry( pack_data, &index, &offset_to_idx, i, &hash_to_idx, &mut pack::ResolveCtx { cache: &mut cache, external: &external, }, ) .map_err(|e| Error::RustError(e.0))?; let hash = pack::hash_object(&pack::ObjectType::Commit, &*data); hash_to_idx.insert(hash.clone(), i); let parsed = store::parse_commit(&*data) .map_err(|e| Error::RustError(format!("commit {}: {}", hash, e)))?; root_tree_hashes.push(parsed.tree_hash.clone()); store::store_commit(sql, &hash, &parsed, &*data, bulk_mode)?; } // --- Process trees --- for i in 0..index.len() { if types[i] != Some(pack::ObjectType::Tree) { continue; } let (_, data) = pack::resolve_entry( pack_data, &index, &offset_to_idx, i, &hash_to_idx, &mut pack::ResolveCtx { cache: &mut cache, external: &external, }, ) .map_err(|e| Error::RustError(e.0))?; let hash = pack::hash_object(&pack::ObjectType::Tree, &*data); hash_to_idx.insert(hash.clone(), i); store::store_tree(sql, &hash, &*data)?; } // --- Resolve blob paths (all trees now in permanent storage) --- let empty_pack_trees: HashMap> = HashMap::new(); let blob_paths = store::resolve_blob_paths(sql, &empty_pack_trees, &root_tree_hashes)?; // Free memory: commit/tree entries in the resolve cache are no longer needed. // This reclaims ~20-30 MB before blob processing, which needs headroom for // keyframe decompression and xpatch delta computation. cache.clear(); // --- Process blobs --- for i in 0..index.len() { if types[i] != Some(pack::ObjectType::Blob) { continue; } let (_, data) = pack::resolve_entry( pack_data, &index, &offset_to_idx, i, &hash_to_idx, &mut pack::ResolveCtx { cache: &mut cache, external: &external, }, ) .map_err(|e| Error::RustError(e.0))?; let hash = pack::hash_object(&pack::ObjectType::Blob, &*data); hash_to_idx.insert(hash.clone(), i); let path = blob_paths .get(&hash) .map(|s| s.as_str()) .unwrap_or("unknown"); store::store_blob(sql, &hash, &*data, path, KEYFRAME_INTERVAL)?; } // --- Handle REF_DELTA entries with unknown types --- for i in 0..index.len() { if types[i].is_some() { continue; } let resolved = pack::resolve_entry( pack_data, &index, &offset_to_idx, i, &hash_to_idx, &mut pack::ResolveCtx { cache: &mut cache, external: &external, }, ); match resolved { Ok((obj_type, data)) => { let hash = pack::hash_object(&obj_type, &*data); hash_to_idx.insert(hash.clone(), i); match obj_type { pack::ObjectType::Commit => { let parsed = store::parse_commit(&*data) .map_err(|e| Error::RustError(format!("commit {}: {}", hash, e)))?; store::store_commit(sql, &hash, &parsed, &*data, bulk_mode)?; } pack::ObjectType::Tree => { store::store_tree(sql, &hash, &*data)?; } pack::ObjectType::Blob => { let path = blob_paths .get(&hash) .map(|s| s.as_str()) .unwrap_or("unknown"); store::store_blob(sql, &hash, &*data, path, KEYFRAME_INTERVAL)?; } pack::ObjectType::Tag => {} } } Err(_) => {} } } Ok(()) } // --------------------------------------------------------------------------- // git-upload-pack (handles `git clone` / `git fetch`) // --------------------------------------------------------------------------- /// Process a git-upload-pack POST request. /// /// 1. Parse want/have lines from the client /// 2. Walk the commit graph to collect all needed objects /// 3. Reconstruct blob content from xpatch delta chains /// 4. Generate and return a pack file pub fn handle_upload_pack(sql: &SqlStorage, body: &[u8]) -> Result { // --- 1. Parse want/have negotiation --- let request = parse_upload_request(body); let preferred_sideband_mode = preferred_sideband_mode(&request.capabilities); let sideband_mode = match requested_sideband_mode(&request.capabilities) { Ok(mode) => mode, Err(e) => { return protocol_fatal_response( "git-upload-pack", &format!("upload-pack capabilities: {}", e), preferred_sideband_mode, ) } }; let send_progress = should_send_upload_progress(&request, sideband_mode); let result: Result = (|| { if request.wants.is_empty() { return Err(Error::RustError("no want lines received".into())); } let have_set: HashSet = request.haves.iter().cloned().collect(); let common_haves = find_common_haves(sql, &request.wants, &request.haves)?; let can_send_pack_without_done = !request.done && request.capabilities.contains("multi_ack_detailed") && request.capabilities.contains("no-done") && !common_haves.is_empty(); if !request.done && !request.haves.is_empty() && !can_send_pack_without_done { return protocol_result_response( "git-upload-pack", build_negotiation_response(&request, &common_haves), ); } // --- 2-3. Collect all needed objects (commits, trees, blobs) --- let objects = store::collect_objects(sql, &request.wants, &have_set)?; // Build response: pkt-line negotiation prefix, then pack data. let mut resp_body = build_pack_response_prefix(&request, &common_haves); match sideband_mode { Some(mode) => { if send_progress { for message in upload_pack_progress_messages(objects.len()) { append_sideband_data(&mut resp_body, 2, message.as_bytes(), mode); } } pack::generate_into(&objects, |chunk| { append_sideband_data(&mut resp_body, 1, chunk, mode) }); resp_body.extend_from_slice(b"0000"); } None => pack::generate_into(&objects, |chunk| resp_body.extend_from_slice(chunk)), } protocol_result_response("git-upload-pack", resp_body) })(); result.or_else(|err| { protocol_fatal_response( "git-upload-pack", &protocol_error_message(&err), sideband_mode, ) }) } #[derive(Debug, Default)] struct UploadRequest { wants: Vec, haves: Vec, capabilities: HashSet, done: bool, } /// Parse want/have lines from a git-upload-pack request body. /// /// Format (pkt-line encoded): /// want [ capabilities]\n /// ... /// [have \n] /// ... /// done\n fn parse_upload_request(data: &[u8]) -> UploadRequest { let mut request = UploadRequest::default(); let mut pos = 0; let mut saw_first_want = false; loop { match read_pkt_line(data, pos) { Some((None, new_pos)) => { // Flush packet — may separate wants from haves pos = new_pos; } Some((Some(line), new_pos)) => { pos = new_pos; let text = match std::str::from_utf8(line) { Ok(t) => t.trim_end_matches('\n'), Err(_) => continue, }; if text == "done" { request.done = true; break; } else if let Some(rest) = text.strip_prefix("want ") { // First want line may have capabilities after a space let mut fields = rest.split_whitespace(); let hash = fields.next().unwrap_or(""); if hash.len() == 40 { request.wants.push(hash.to_string()); } if !saw_first_want { saw_first_want = true; for capability in fields { request.capabilities.insert(capability.to_string()); } } } else if let Some(rest) = text.strip_prefix("have ") { let hash = rest.split_whitespace().next().unwrap_or(""); if hash.len() == 40 { request.haves.push(hash.to_string()); } } } None => break, } } request } fn find_common_haves(sql: &SqlStorage, wants: &[String], haves: &[String]) -> Result> { if wants.is_empty() || haves.is_empty() { return Ok(Vec::new()); } let want_haves: HashSet = haves.iter().cloned().collect(); let mut found: HashSet = HashSet::new(); let mut visited: HashSet = HashSet::new(); let mut queue: VecDeque = wants.iter().cloned().collect(); #[derive(serde::Deserialize)] struct ParentRow { parent_hash: String, } while let Some(commit_hash) = queue.pop_front() { if !visited.insert(commit_hash.clone()) { continue; } if want_haves.contains(&commit_hash) { found.insert(commit_hash.clone()); } let parents: Vec = sql .exec( "SELECT parent_hash FROM commit_parents WHERE commit_hash = ? ORDER BY ordinal ASC", vec![SqlStorageValue::from(commit_hash)], )? .to_array()?; for parent in parents { if !visited.contains(&parent.parent_hash) { queue.push_back(parent.parent_hash); } } } Ok(haves .iter() .filter(|have| found.contains(*have)) .cloned() .collect()) } fn build_negotiation_response(request: &UploadRequest, common_haves: &[String]) -> Vec { let mut body = Vec::new(); match common_haves.last().map(|s| s.as_str()) { Some(_) if request.capabilities.contains("multi_ack_detailed") => { for common in common_haves { pkt_line_bytes(&mut body, format!("ACK {} common\n", common).as_bytes()); } pkt_line_bytes(&mut body, b"NAK\n"); } Some(common) if request.capabilities.contains("multi_ack") => { pkt_line_bytes(&mut body, format!("ACK {} continue\n", common).as_bytes()); pkt_line_bytes(&mut body, b"NAK\n"); } Some(common) => { pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes()); } None => pkt_line_bytes(&mut body, b"NAK\n"), } body } fn build_pack_response_prefix(request: &UploadRequest, common_haves: &[String]) -> Vec { let mut body = Vec::new(); if !request.done && request.capabilities.contains("multi_ack_detailed") && request.capabilities.contains("no-done") { if let Some(common) = common_haves.last().map(|s| s.as_str()) { for common_have in common_haves { pkt_line_bytes( &mut body, format!("ACK {} common\n", common_have).as_bytes(), ); } pkt_line_bytes(&mut body, format!("ACK {} ready\n", common).as_bytes()); pkt_line_bytes(&mut body, b"NAK\n"); pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes()); return body; } } if request.done { if request.capabilities.contains("multi_ack_detailed") || request.capabilities.contains("multi_ack") { if let Some(common) = common_haves.last().map(|s| s.as_str()) { pkt_line_bytes(&mut body, format!("ACK {}\n", common).as_bytes()); return body; } } if common_haves.is_empty() { pkt_line_bytes(&mut body, b"NAK\n"); } return body; } pkt_line_bytes(&mut body, b"NAK\n"); body } // --------------------------------------------------------------------------- // Pkt-line parsing for ref commands // --------------------------------------------------------------------------- struct RefCommand { old_hash: String, new_hash: String, ref_name: String, } #[derive(Default)] struct ReceivePackRequest { commands: Vec, pack_offset: usize, capabilities: HashSet, } #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum SidebandMode { Small, Large, } impl SidebandMode { fn max_data_len(self) -> usize { match self { // 4-byte pkt-line length + 1 sideband code + 995 bytes data = 1000 total. Self::Small => 995, // 4-byte pkt-line length + 1 sideband code + 65515 bytes data = 65520 total. Self::Large => 65_515, } } } /// Parse pkt-line encoded ref update commands from the start of the body. /// Returns the commands and the byte offset where the pack data begins. fn parse_receive_pack_request(data: &[u8]) -> ReceivePackRequest { let mut request = ReceivePackRequest::default(); let mut pos = 0; let mut first_command = true; loop { match read_pkt_line(data, pos) { Some((None, new_pos)) => { // Flush packet: end of commands pos = new_pos; break; } Some((Some(line), new_pos)) => { pos = new_pos; if let Some((cmd, capabilities)) = parse_single_command(line) { if first_command { request.capabilities = capabilities; first_command = false; } request.commands.push(cmd); } } None => break, // end of data } } request.pack_offset = pos; request } /// Read one pkt-line from data at the given position. /// Returns Some((None, new_pos)) for flush, Some((Some(payload), new_pos)) /// for data, or None if at end of input. fn read_pkt_line(data: &[u8], pos: usize) -> Option<(Option<&[u8]>, usize)> { if pos + 4 > data.len() { return None; } let hex = std::str::from_utf8(&data[pos..pos + 4]).ok()?; let len = usize::from_str_radix(hex, 16).ok()?; if len == 0 { // Flush packet return Some((None, pos + 4)); } if len < 4 || pos + len > data.len() { return None; // malformed } let payload = &data[pos + 4..pos + len]; Some((Some(payload), pos + len)) } /// Parse a single command line: " [\0capabilities]\n" fn parse_single_command(line: &[u8]) -> Option<(RefCommand, HashSet)> { // Strip trailing newline let line = if line.last() == Some(&b'\n') { &line[..line.len() - 1] } else { line }; let (line, capabilities) = match line.iter().position(|&b| b == 0) { Some(pos) => (&line[..pos], parse_capabilities(&line[pos + 1..])), None => (line, HashSet::new()), }; let text = std::str::from_utf8(line).ok()?; let parts: Vec<&str> = text.splitn(3, ' ').collect(); if parts.len() != 3 { return None; } Some(( RefCommand { old_hash: parts[0].to_string(), new_hash: parts[1].to_string(), ref_name: parts[2].to_string(), }, capabilities, )) } fn parse_capabilities(raw: &[u8]) -> HashSet { std::str::from_utf8(raw) .ok() .map(|text| text.split_whitespace().map(str::to_string).collect()) .unwrap_or_default() } fn preferred_sideband_mode(capabilities: &HashSet) -> Option { if capabilities.contains("side-band-64k") { Some(SidebandMode::Large) } else if capabilities.contains("side-band") { Some(SidebandMode::Small) } else { None } } fn requested_sideband_mode( capabilities: &HashSet, ) -> std::result::Result, String> { let wants_small = capabilities.contains("side-band"); let wants_large = capabilities.contains("side-band-64k"); if wants_small && wants_large { return Err("client requested both side-band and side-band-64k".into()); } Ok(preferred_sideband_mode(capabilities)) } fn should_send_upload_progress( request: &UploadRequest, sideband_mode: Option, ) -> bool { sideband_mode.is_some() && !request.capabilities.contains("no-progress") } fn should_send_receive_progress( request: &ReceivePackRequest, sideband_mode: Option, ) -> bool { sideband_mode.is_some() && !request.capabilities.contains("quiet") } fn upload_pack_progress_messages(object_count: usize) -> Vec { vec![ format!("Enumerating objects: {}, done.\n", object_count), format!( "Counting objects: 100% ({}/{}), done.\n", object_count, object_count ), format!( "Compressing objects: 100% ({}/{}), done.\n", object_count, object_count ), format!( "Total {} (delta 0), reused 0 (delta 0), pack-reused 0\n", object_count ), ] } fn receive_pack_progress_messages( request: &ReceivePackRequest, pack_bytes: usize, ok_count: usize, failed_count: usize, rebuilt_default_branch_fts: bool, ) -> Vec { let mut messages = vec![ format!("Processing {} ref update(s).\n", request.commands.len()), format!("Received pack: {} bytes.\n", pack_bytes), format!( "Updated refs: {} succeeded, {} failed.\n", ok_count, failed_count ), ]; if rebuilt_default_branch_fts { messages.push("Rebuilt search index for the default branch.\n".to_string()); } messages } // --------------------------------------------------------------------------- // Report status // --------------------------------------------------------------------------- /// Build a report-status response in pkt-line format. fn build_report_status(results: &[(String, std::result::Result<(), String>)]) -> Vec { build_report_status_with_unpack_result("ok", results) } fn build_unpack_error_status(commands: &[RefCommand], reason: &str) -> Vec { let results: Vec<(String, std::result::Result<(), String>)> = commands .iter() .map(|cmd| (cmd.ref_name.clone(), Err(reason.to_string()))) .collect(); build_report_status_with_unpack_result(reason, &results) } fn build_report_status_with_unpack_result( unpack_result: &str, results: &[(String, std::result::Result<(), String>)], ) -> Vec { let mut buf = Vec::new(); pkt_line_bytes(&mut buf, format!("unpack {}\n", unpack_result).as_bytes()); for (ref_name, result) in results { match result { Ok(()) => { let line = format!("ok {}\n", ref_name); pkt_line_bytes(&mut buf, line.as_bytes()); } Err(reason) => { let line = format!("ng {} {}\n", ref_name, reason); pkt_line_bytes(&mut buf, line.as_bytes()); } } } buf.extend_from_slice(b"0000"); // flush buf } fn maybe_sideband_wrap_with_progress( body: Vec, progress_messages: &[String], sideband_mode: Option, ) -> Vec { match sideband_mode { Some(mode) => { let mut wrapped = Vec::new(); for message in progress_messages { append_sideband_data(&mut wrapped, 2, message.as_bytes(), mode); } append_sideband_data(&mut wrapped, 1, &body, mode); wrapped.extend_from_slice(b"0000"); wrapped } None => body, } } fn protocol_result_response(service: &str, body: Vec) -> Result { let mut resp = Response::from_bytes(body)?; resp.headers_mut() .set("Content-Type", &format!("application/x-{}-result", service))?; Ok(resp) } fn protocol_fatal_response( service: &str, message: &str, sideband_mode: Option, ) -> Result { protocol_result_response(service, protocol_fatal_body(message, sideband_mode)) } fn protocol_fatal_body(message: &str, sideband_mode: Option) -> Vec { match sideband_mode { Some(mode) => { let mut body = Vec::new(); append_sideband_data( &mut body, 3, format!("fatal: {}\n", message).as_bytes(), mode, ); body.extend_from_slice(b"0000"); body } None => { let mut body = Vec::new(); pkt_line_bytes(&mut body, format!("ERR {}\n", message).as_bytes()); body.extend_from_slice(b"0000"); body } } } fn protocol_error_message(err: &Error) -> String { match err { Error::RustError(message) => message.clone(), _ => err.to_string(), } } fn append_sideband_data(buf: &mut Vec, channel: u8, data: &[u8], mode: SidebandMode) { for chunk in data.chunks(mode.max_data_len()) { let mut payload = Vec::with_capacity(1 + chunk.len()); payload.push(channel); payload.extend_from_slice(chunk); pkt_line_bytes(buf, &payload); } } fn pkt_line_bytes(buf: &mut Vec, data: &[u8]) { let len = 4 + data.len(); buf.extend_from_slice(format!("{:04x}", len).as_bytes()); buf.extend_from_slice(data); } #[cfg(test)] mod tests { use super::*; #[test] fn parse_receive_pack_request_reads_capabilities_and_pack_offset() { let mut body = Vec::new(); pkt_line_bytes( &mut body, b"0000000000000000000000000000000000000000 0123456789012345678901234567890123456789 refs/heads/main\0report-status side-band-64k ofs-delta\n", ); body.extend_from_slice(b"0000PACK"); let request = parse_receive_pack_request(&body); assert_eq!(request.commands.len(), 1); assert_eq!(request.commands[0].ref_name, "refs/heads/main"); assert_eq!(request.pack_offset, body.len() - 4); assert!(request.capabilities.contains("report-status")); assert!(request.capabilities.contains("side-band-64k")); assert!(request.capabilities.contains("ofs-delta")); } #[test] fn requested_sideband_mode_rejects_conflicting_requests() { let capabilities = HashSet::from(["side-band".to_string(), "side-band-64k".to_string()]); let err = requested_sideband_mode(&capabilities).expect_err("conflicting sideband"); assert_eq!(err, "client requested both side-band and side-band-64k"); } #[test] fn maybe_sideband_wrap_encodes_report_status_on_channel_one() { let body = build_report_status(&[("refs/heads/main".to_string(), Ok(()))]); let wrapped = maybe_sideband_wrap_with_progress(body.clone(), &[], Some(SidebandMode::Large)); let mut expected = Vec::new(); let mut payload = Vec::new(); payload.push(1); payload.extend_from_slice(&body); pkt_line_bytes(&mut expected, &payload); expected.extend_from_slice(b"0000"); assert_eq!(wrapped, expected); } #[test] fn maybe_sideband_wrap_with_progress_places_channel_two_before_status() { let body = build_report_status(&[("refs/heads/main".to_string(), Ok(()))]); let progress = vec!["Processing 1 ref update(s).\n".to_string()]; let wrapped = maybe_sideband_wrap_with_progress(body.clone(), &progress, Some(SidebandMode::Large)); let mut expected = Vec::new(); let mut progress_payload = Vec::new(); progress_payload.push(2); progress_payload.extend_from_slice(progress[0].as_bytes()); pkt_line_bytes(&mut expected, &progress_payload); let mut status_payload = Vec::new(); status_payload.push(1); status_payload.extend_from_slice(&body); pkt_line_bytes(&mut expected, &status_payload); expected.extend_from_slice(b"0000"); assert_eq!(wrapped, expected); } #[test] fn upload_pack_progress_can_be_suppressed_by_no_progress() { let request = UploadRequest { capabilities: HashSet::from(["no-progress".to_string()]), ..UploadRequest::default() }; assert!(!should_send_upload_progress( &request, Some(SidebandMode::Large) )); assert!(should_send_upload_progress( &UploadRequest::default(), Some(SidebandMode::Large) )); assert!(!should_send_upload_progress( &UploadRequest::default(), None )); } #[test] fn receive_pack_progress_can_be_suppressed_by_quiet() { let request = ReceivePackRequest { capabilities: HashSet::from(["quiet".to_string()]), ..ReceivePackRequest::default() }; assert!(!should_send_receive_progress( &request, Some(SidebandMode::Large) )); assert!(should_send_receive_progress( &ReceivePackRequest::default(), Some(SidebandMode::Large) )); assert!(!should_send_receive_progress( &ReceivePackRequest::default(), None )); } #[test] fn upload_pack_progress_messages_include_expected_stages() { let messages = upload_pack_progress_messages(7); assert_eq!( messages, vec![ "Enumerating objects: 7, done.\n", "Counting objects: 100% (7/7), done.\n", "Compressing objects: 100% (7/7), done.\n", "Total 7 (delta 0), reused 0 (delta 0), pack-reused 0\n", ] ); } #[test] fn receive_pack_progress_messages_include_summary_and_rebuild_notice() { let request = ReceivePackRequest { commands: vec![RefCommand { old_hash: store::ZERO_HASH.to_string(), new_hash: "0123456789012345678901234567890123456789".to_string(), ref_name: "refs/heads/main".to_string(), }], ..ReceivePackRequest::default() }; let messages = receive_pack_progress_messages(&request, 1234, 1, 0, true); assert_eq!( messages, vec![ "Processing 1 ref update(s).\n", "Received pack: 1234 bytes.\n", "Updated refs: 1 succeeded, 0 failed.\n", "Rebuilt search index for the default branch.\n", ] ); } #[test] fn append_sideband_data_splits_payloads_at_mode_boundary() { let data = vec![b'x'; 1_100]; let mut wrapped = Vec::new(); append_sideband_data(&mut wrapped, 1, &data, SidebandMode::Small); let mut expected = Vec::new(); let mut first = vec![1]; first.extend(vec![b'x'; 995]); pkt_line_bytes(&mut expected, &first); let mut second = vec![1]; second.extend(vec![b'x'; 105]); pkt_line_bytes(&mut expected, &second); assert_eq!(wrapped, expected); } #[test] fn build_unpack_error_status_uses_unpack_error_without_ng_prefix() { let commands = vec![RefCommand { old_hash: store::ZERO_HASH.to_string(), new_hash: "0123456789012345678901234567890123456789".to_string(), ref_name: "refs/heads/main".to_string(), }]; let status = build_unpack_error_status(&commands, "pack too large"); let mut expected = Vec::new(); pkt_line_bytes(&mut expected, b"unpack pack too large\n"); pkt_line_bytes(&mut expected, b"ng refs/heads/main pack too large\n"); expected.extend_from_slice(b"0000"); assert_eq!(status, expected); } #[test] fn protocol_fatal_body_uses_channel_three_when_sideband_is_active() { let body = protocol_fatal_body("boom", Some(SidebandMode::Large)); let mut expected = Vec::new(); let mut payload = vec![3]; payload.extend_from_slice(b"fatal: boom\n"); pkt_line_bytes(&mut expected, &payload); expected.extend_from_slice(b"0000"); assert_eq!(body, expected); } #[test] fn protocol_fatal_body_uses_err_pkt_line_without_sideband() { let body = protocol_fatal_body("boom", None); let mut expected = Vec::new(); pkt_line_bytes(&mut expected, b"ERR boom\n"); expected.extend_from_slice(b"0000"); assert_eq!(body, expected); } #[test] fn parse_upload_request_reads_caps_haves_and_done() { let mut body = Vec::new(); pkt_line_bytes( &mut body, b"want 0123456789012345678901234567890123456789 multi_ack_detailed no-done ofs-delta\n", ); body.extend_from_slice(b"0000"); pkt_line_bytes( &mut body, b"have abcdefabcdefabcdefabcdefabcdefabcdefabcd\n", ); pkt_line_bytes(&mut body, b"done\n"); let request = parse_upload_request(&body); assert_eq!( request.wants, vec!["0123456789012345678901234567890123456789"] ); assert_eq!( request.haves, vec!["abcdefabcdefabcdefabcdefabcdefabcdefabcd"] ); assert!(request.done); assert!(request.capabilities.contains("multi_ack_detailed")); assert!(request.capabilities.contains("no-done")); assert!(request.capabilities.contains("ofs-delta")); } #[test] fn pack_response_prefix_uses_ack_ready_for_no_done_fetches() { let mut capabilities = HashSet::new(); capabilities.insert("multi_ack_detailed".to_string()); capabilities.insert("no-done".to_string()); let request = UploadRequest { capabilities, ..UploadRequest::default() }; let common_haves = vec!["0123456789012345678901234567890123456789".to_string()]; let prefix = build_pack_response_prefix(&request, &common_haves); let mut expected = Vec::new(); pkt_line_bytes( &mut expected, b"ACK 0123456789012345678901234567890123456789 common\n", ); pkt_line_bytes( &mut expected, b"ACK 0123456789012345678901234567890123456789 ready\n", ); pkt_line_bytes(&mut expected, b"NAK\n"); pkt_line_bytes( &mut expected, b"ACK 0123456789012345678901234567890123456789\n", ); assert_eq!(prefix, expected); } #[test] fn negotiation_response_acks_common_commit_before_done() { let mut capabilities = HashSet::new(); capabilities.insert("multi_ack_detailed".to_string()); let request = UploadRequest { capabilities, ..UploadRequest::default() }; let response = build_negotiation_response( &request, &["0123456789012345678901234567890123456789".to_string()], ); let mut expected = Vec::new(); pkt_line_bytes( &mut expected, b"ACK 0123456789012345678901234567890123456789 common\n", ); pkt_line_bytes(&mut expected, b"NAK\n"); assert_eq!(response, expected); } }