branch: main
store.rs
48695 bytesRaw
//! Storage layer: parsing and persisting git objects into SQLite,
//! with xpatch delta compression for blobs.

use crate::pack;
use std::collections::{HashMap, HashSet, VecDeque};
use worker::*;

pub const ZERO_HASH: &str = "0000000000000000000000000000000000000000";

/// Cloudflare DO SQLite limit: 2 MB per row. We stay safely under it.
const MAX_BLOB_ROW_SIZE: usize = 1_900_000;

/// Blobs larger than this skip zlib/xpatch and are stored raw + chunked.
/// Keeps peak memory well within the 128 MB DO limit (pack body + raw blob
/// + working memory must all fit). 50 MB is conservative.
const MAX_COMPRESS_SIZE: usize = 50_000_000;

// ---------------------------------------------------------------------------
// Parsed types
// ---------------------------------------------------------------------------

pub struct ParsedCommit {
    pub tree_hash: String,
    pub parents: Vec<String>,
    pub author: String,
    pub author_email: String,
    pub author_time: i64,
    pub committer: String,
    pub committer_email: String,
    pub commit_time: i64,
    pub message: String,
}

pub struct TreeEntry {
    pub mode: u32,
    pub name: String,
    pub hash: String, // 40-char hex SHA-1
}

// ---------------------------------------------------------------------------
// Object parsing
// ---------------------------------------------------------------------------

/// Parse a git commit object's raw data into structured fields.
///
/// Format:
///   tree <hex>\n
///   parent <hex>\n        (zero or more)
///   author <name> <<email>> <ts> <tz>\n
///   committer <name> <<email>> <ts> <tz>\n
///   \n
///   <message>
pub fn parse_commit(data: &[u8]) -> std::result::Result<ParsedCommit, String> {
    // Lossy conversion: old commits may use Latin-1 or other encodings for
    // author names. The raw bytes are preserved in raw_objects for fetch;
    // we only need displayable strings for the parsed commits table.
    let text = String::from_utf8_lossy(data).into_owned();

    let mut tree_hash = String::new();
    let mut parents = Vec::new();
    let mut author = String::new();
    let mut author_email = String::new();
    let mut author_time: i64 = 0;
    let mut committer = String::new();
    let mut committer_email = String::new();
    let mut commit_time: i64 = 0;

    // Split at the first blank line: headers vs message body
    let (header_block, message) = match text.find("\n\n") {
        Some(pos) => (&text[..pos], text[pos + 2..].to_string()),
        None => (text.as_str(), String::new()),
    };

    for line in header_block.lines() {
        if let Some(hash) = line.strip_prefix("tree ") {
            tree_hash = hash.to_string();
        } else if let Some(hash) = line.strip_prefix("parent ") {
            parents.push(hash.to_string());
        } else if let Some(rest) = line.strip_prefix("author ") {
            let (name, email, time) = parse_identity(rest)?;
            author = name;
            author_email = email;
            author_time = time;
        } else if let Some(rest) = line.strip_prefix("committer ") {
            let (name, email, time) = parse_identity(rest)?;
            committer = name;
            committer_email = email;
            commit_time = time;
        }
        // gpgsig, mergetag, etc. are silently ignored
    }

    if tree_hash.is_empty() {
        return Err("commit missing tree header".into());
    }

    Ok(ParsedCommit {
        tree_hash,
        parents,
        author,
        author_email,
        author_time,
        committer,
        committer_email,
        commit_time,
        message,
    })
}

/// Parse an identity line: "Name <email> timestamp timezone"
fn parse_identity(s: &str) -> std::result::Result<(String, String, i64), String> {
    // Work backwards: timezone is last token, timestamp is second-to-last,
    // everything before that is "name <email>"
    let parts: Vec<&str> = s.rsplitn(3, ' ').collect();
    if parts.len() < 3 {
        return Err(format!("malformed identity: {}", s));
    }
    let _timezone = parts[0];
    let timestamp: i64 = parts[1]
        .parse()
        .map_err(|e| format!("bad timestamp: {}", e))?;
    let name_email = parts[2];

    // Split "Name <email>" at the last '<'
    let (name, email) = match name_email.rfind('<') {
        Some(pos) => {
            let name = name_email[..pos].trim().to_string();
            let email = name_email[pos + 1..]
                .trim_end_matches('>')
                .trim()
                .to_string();
            (name, email)
        }
        None => (name_email.to_string(), String::new()),
    };

    Ok((name, email, timestamp))
}

/// Parse a git tree object's binary data into entries.
///
/// Binary format (repeated):
///   <mode-ascii-digits> SP <name> NUL <20-byte-raw-hash>
pub fn parse_tree_data(data: &[u8]) -> std::result::Result<Vec<TreeEntry>, String> {
    let mut entries = Vec::new();
    let mut pos = 0;

    while pos < data.len() {
        // Mode: ASCII digits until space
        let space_pos = data[pos..]
            .iter()
            .position(|&b| b == b' ')
            .ok_or("tree: missing space after mode")?;
        let mode_str = std::str::from_utf8(&data[pos..pos + space_pos])
            .map_err(|e| format!("tree mode: {}", e))?;
        let mode =
            u32::from_str_radix(mode_str, 8).map_err(|e| format!("tree mode parse: {}", e))?;
        pos += space_pos + 1;

        // Name: bytes until NUL
        let nul_pos = data[pos..]
            .iter()
            .position(|&b| b == 0)
            .ok_or("tree: missing NUL after name")?;
        let name = String::from_utf8_lossy(&data[pos..pos + nul_pos]);
        pos += nul_pos + 1;

        // 20-byte raw SHA-1
        if pos + 20 > data.len() {
            return Err("tree: truncated hash".into());
        }
        let hash = pack::hex_encode(&data[pos..pos + 20]);
        pos += 20;

        entries.push(TreeEntry {
            mode,
            name: name.to_string(),
            hash,
        });
    }

    Ok(entries)
}

// ---------------------------------------------------------------------------
// Object storage
// ---------------------------------------------------------------------------

/// Store a parsed commit, its parent edges, and the raw object bytes.
/// Skips if already stored. When `bulk_mode` is true, skips commit graph
/// building and fts_commits indexing (rebuild via admin endpoints after).
pub fn store_commit(
    sql: &SqlStorage,
    hash: &str,
    c: &ParsedCommit,
    raw_data: &[u8],
    bulk_mode: bool,
) -> Result<()> {
    // Fast existence check: indexed PK lookup, single row
    #[derive(serde::Deserialize)]
    #[allow(dead_code)]
    struct Exists {
        x: i64,
    }
    let exists: Vec<Exists> = sql
        .exec(
            "SELECT 1 AS x FROM commits WHERE hash = ? LIMIT 1",
            vec![SqlStorageValue::from(hash.to_string())],
        )?
        .to_array()?;
    if !exists.is_empty() {
        return Ok(());
    }

    sql.exec(
        "INSERT INTO commits
            (hash, tree_hash, author, author_email, author_time,
             committer, committer_email, commit_time, message)
         VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
        vec![
            SqlStorageValue::from(hash.to_string()),
            SqlStorageValue::from(c.tree_hash.clone()),
            SqlStorageValue::from(c.author.clone()),
            SqlStorageValue::from(c.author_email.clone()),
            SqlStorageValue::from(c.author_time),
            SqlStorageValue::from(c.committer.clone()),
            SqlStorageValue::from(c.committer_email.clone()),
            SqlStorageValue::from(c.commit_time),
            SqlStorageValue::from(c.message.clone()),
        ],
    )?;

    // Batch parent inserts: up to 33 parents per statement (3 params each, 100 max)
    for chunk in c.parents.chunks(33) {
        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?)").collect();
        let sql_str = format!(
            "INSERT INTO commit_parents (commit_hash, parent_hash, ordinal) VALUES {}",
            placeholders.join(", ")
        );
        let mut params = Vec::new();
        for (i, parent) in chunk.iter().enumerate() {
            params.push(SqlStorageValue::from(hash.to_string()));
            params.push(SqlStorageValue::from(parent.clone()));
            params.push(SqlStorageValue::from(i as i64));
        }
        sql.exec(&sql_str, params)?;
    }

    // Store raw bytes for byte-identical fetch
    store_raw_object(sql, hash, raw_data)?;

    if !bulk_mode {
        // Build binary lifting table
        build_commit_graph(sql, hash, c.parents.first().map(|s| s.as_str()))?;

        // Index commit message for FTS search
        sql.exec(
            "INSERT OR IGNORE INTO fts_commits (hash, message, author) VALUES (?, ?, ?)",
            vec![
                SqlStorageValue::from(hash.to_string()),
                SqlStorageValue::from(c.message.clone()),
                SqlStorageValue::from(c.author.clone()),
            ],
        )?;
    }

    Ok(())
}

/// Load raw object bytes for fetch.
fn load_raw_object(sql: &SqlStorage, hash: &str) -> Result<Option<Vec<u8>>> {
    #[derive(serde::Deserialize)]
    struct Row {
        #[serde(with = "serde_bytes")]
        data: Vec<u8>,
    }
    let rows: Vec<Row> = sql
        .exec(
            "SELECT data FROM raw_objects WHERE hash = ?",
            vec![SqlStorageValue::from(hash.to_string())],
        )?
        .to_array()?;
    Ok(rows.into_iter().next().map(|r| r.data))
}

/// Load raw object bytes — public wrapper for thin pack resolution.
pub fn load_raw_object_pub(sql: &SqlStorage, hash: &str) -> Result<Option<Vec<u8>>> {
    load_raw_object(sql, hash)
}

/// Reconstruct a blob by hash for thin pack resolution. Looks up the blob's
/// group and version, then reconstructs from the delta chain.
/// Returns None if the hash isn't in the blobs table.
pub fn reconstruct_blob_by_hash(sql: &SqlStorage, hash: &str) -> Result<Option<Vec<u8>>> {
    #[derive(serde::Deserialize)]
    struct BlobInfo {
        group_id: i64,
        version_in_group: i64,
    }
    let rows: Vec<BlobInfo> = sql
        .exec(
            "SELECT group_id, version_in_group FROM blobs WHERE blob_hash = ? LIMIT 1",
            vec![SqlStorageValue::from(hash.to_string())],
        )?
        .to_array()?;
    match rows.into_iter().next() {
        Some(info) => Ok(Some(reconstruct_blob(
            sql,
            info.group_id,
            info.version_in_group,
        )?)),
        None => Ok(None),
    }
}

/// Detect the git object type by checking which table contains the hash.
/// More reliable than parsing raw bytes (which can have edge cases).
pub fn detect_object_type(sql: &SqlStorage, hash: &str) -> crate::pack::ObjectType {
    #[derive(serde::Deserialize)]
    #[allow(dead_code)]
    struct Exists {
        x: i64,
    }

    let in_commits: bool = sql
        .exec(
            "SELECT 1 AS x FROM commits WHERE hash = ? LIMIT 1",
            vec![SqlStorageValue::from(hash.to_string())],
        )
        .map(|c| {
            c.to_array::<Exists>()
                .map(|r| !r.is_empty())
                .unwrap_or(false)
        })
        .unwrap_or(false);
    if in_commits {
        return crate::pack::ObjectType::Commit;
    }

    let in_trees: bool = sql
        .exec(
            "SELECT 1 AS x FROM trees WHERE tree_hash = ? LIMIT 1",
            vec![SqlStorageValue::from(hash.to_string())],
        )
        .map(|c| {
            c.to_array::<Exists>()
                .map(|r| !r.is_empty())
                .unwrap_or(false)
        })
        .unwrap_or(false);
    if in_trees {
        return crate::pack::ObjectType::Tree;
    }

    // Blobs aren't stored in raw_objects, but default to Blob as fallback
    crate::pack::ObjectType::Blob
}

/// Store raw object bytes (for commits and trees) so we can return them
/// byte-for-byte identical during fetch.
fn store_raw_object(sql: &SqlStorage, hash: &str, data: &[u8]) -> Result<()> {
    sql.exec(
        "INSERT OR IGNORE INTO raw_objects (hash, data) VALUES (?, ?)",
        vec![
            SqlStorageValue::from(hash.to_string()),
            SqlStorageValue::Blob(data.to_vec()),
        ],
    )?;
    Ok(())
}

/// Populate the binary lifting table for O(log N) ancestor queries.
///
/// Level 0 = first parent.
/// Level k = the level k-1 ancestor of the level k-1 ancestor.
fn build_commit_graph(
    sql: &SqlStorage,
    commit_hash: &str,
    first_parent: Option<&str>,
) -> Result<()> {
    let parent = match first_parent {
        Some(p) => p,
        None => return Ok(()), // root commit, no ancestors
    };

    // Level 0: direct parent
    sql.exec(
        "INSERT OR IGNORE INTO commit_graph (commit_hash, level, ancestor_hash)
         VALUES (?, 0, ?)",
        vec![
            SqlStorageValue::from(commit_hash.to_string()),
            SqlStorageValue::from(parent.to_string()),
        ],
    )?;

    // Higher levels: the level k-1 ancestor of our level k-1 ancestor
    #[derive(serde::Deserialize)]
    struct Ancestor {
        ancestor_hash: String,
    }

    let mut level = 1;
    let mut prev_ancestor = parent.to_string();

    loop {
        // Look up prev_ancestor's level-1 entry
        let rows: Vec<Ancestor> = sql
            .exec(
                "SELECT ancestor_hash FROM commit_graph
                 WHERE commit_hash = ? AND level = ?",
                vec![
                    SqlStorageValue::from(prev_ancestor.clone()),
                    SqlStorageValue::from(level - 1),
                ],
            )?
            .to_array()?;

        match rows.first() {
            Some(row) => {
                sql.exec(
                    "INSERT OR IGNORE INTO commit_graph (commit_hash, level, ancestor_hash)
                     VALUES (?, ?, ?)",
                    vec![
                        SqlStorageValue::from(commit_hash.to_string()),
                        SqlStorageValue::from(level),
                        SqlStorageValue::from(row.ancestor_hash.clone()),
                    ],
                )?;
                prev_ancestor = row.ancestor_hash.clone();
                level += 1;
            }
            None => break, // no more ancestors at this depth
        }
    }

    Ok(())
}

/// Store tree entries. Skips if tree already exists.
pub fn store_tree(sql: &SqlStorage, tree_hash: &str, data: &[u8]) -> Result<()> {
    // Fast existence check: indexed PK lookup, single row
    #[derive(serde::Deserialize)]
    #[allow(dead_code)]
    struct Exists {
        x: i64,
    }
    let exists: Vec<Exists> = sql
        .exec(
            "SELECT 1 AS x FROM trees WHERE tree_hash = ? LIMIT 1",
            vec![SqlStorageValue::from(tree_hash.to_string())],
        )?
        .to_array()?;
    if !exists.is_empty() {
        return Ok(());
    }

    let entries = parse_tree_data(data).map_err(|e| Error::RustError(e))?;

    // Batch INSERTs: 25 entries per statement (4 params each, 100 max)
    for chunk in entries.chunks(25) {
        let placeholders: Vec<&str> = chunk.iter().map(|_| "(?, ?, ?, ?)").collect();
        let sql_str = format!(
            "INSERT OR IGNORE INTO trees (tree_hash, name, mode, entry_hash) VALUES {}",
            placeholders.join(", ")
        );
        let mut params = Vec::new();
        for entry in chunk {
            params.push(SqlStorageValue::from(tree_hash.to_string()));
            params.push(SqlStorageValue::from(entry.name.clone()));
            params.push(SqlStorageValue::from(entry.mode as i64));
            params.push(SqlStorageValue::from(entry.hash.clone()));
        }
        sql.exec(&sql_str, params)?;
    }

    // Store raw bytes for byte-identical fetch
    store_raw_object(sql, tree_hash, data)?;

    Ok(())
}

/// Store a blob with xpatch delta compression. Groups blobs by path for
/// good delta ratios (same file across commits shares a group).
pub fn store_blob(
    sql: &SqlStorage,
    hash: &str,
    raw_data: &[u8],
    path: &str,
    keyframe_interval: i64,
) -> Result<()> {
    // Fast existence check: indexed PK lookup
    #[derive(serde::Deserialize)]
    #[allow(dead_code)]
    struct Exists {
        x: i64,
    }
    let exists: Vec<Exists> = sql
        .exec(
            "SELECT 1 AS x FROM blobs WHERE blob_hash = ? LIMIT 1",
            vec![SqlStorageValue::from(hash.to_string())],
        )?
        .to_array()?;
    if !exists.is_empty() {
        return Ok(());
    }

    // Find or create a blob group for this path
    #[derive(serde::Deserialize)]
    struct GroupRow {
        group_id: i64,
        latest_version: i64,
    }

    let groups: Vec<GroupRow> = sql
        .exec(
            "SELECT group_id, latest_version FROM blob_groups
             WHERE path_hint = ? LIMIT 1",
            vec![SqlStorageValue::from(path.to_string())],
        )?
        .to_array()?;

    let (group_id, latest_version) = if let Some(g) = groups.first() {
        (g.group_id, g.latest_version)
    } else {
        sql.exec(
            "INSERT INTO blob_groups (path_hint, latest_version) VALUES (?, 0)",
            vec![SqlStorageValue::from(path.to_string())],
        )?;
        // Get the auto-incremented id
        #[derive(serde::Deserialize)]
        struct LastId {
            id: i64,
        }
        let row: LastId = sql.exec("SELECT last_insert_rowid() AS id", None)?.one()?;
        (row.id, 0i64)
    };

    let new_version = latest_version + 1;

    // Large blobs: store raw + chunked, no compression. Avoids OOM from
    // zlib/xpatch needing 2x blob size in memory. is_keyframe=2 marks raw.
    if raw_data.len() > MAX_COMPRESS_SIZE {
        let raw_len = raw_data.len() as i64;
        sql.exec(
            "INSERT INTO blobs
                (blob_hash, group_id, version_in_group, is_keyframe, data, raw_size, stored_size)
             VALUES (?, ?, ?, 2, ?, ?, ?)",
            vec![
                SqlStorageValue::from(hash.to_string()),
                SqlStorageValue::from(group_id),
                SqlStorageValue::from(new_version),
                SqlStorageValue::Blob(Vec::new()), // empty sentinel — data in chunks
                SqlStorageValue::from(raw_len),
                SqlStorageValue::from(raw_len),
            ],
        )?;
        for (i, chunk) in raw_data.chunks(MAX_BLOB_ROW_SIZE).enumerate() {
            sql.exec(
                "INSERT INTO blob_chunks
                    (group_id, version_in_group, chunk_index, data)
                 VALUES (?, ?, ?, ?)",
                vec![
                    SqlStorageValue::from(group_id),
                    SqlStorageValue::from(new_version),
                    SqlStorageValue::from(i as i64),
                    SqlStorageValue::Blob(chunk.to_vec()),
                ],
            )?;
        }
        sql.exec(
            "UPDATE blob_groups SET latest_version = ? WHERE group_id = ?",
            vec![
                SqlStorageValue::from(new_version),
                SqlStorageValue::from(group_id),
            ],
        )?;
        return Ok(());
    }

    let is_keyframe = new_version == 1 || (new_version % keyframe_interval == 1);

    let stored_data = if is_keyframe {
        // Compress keyframes with zlib to reduce storage and stay under
        // the 2 MB per-row DO SQLite limit for most files.
        blob_zlib_compress(raw_data)
    } else {
        // Reconstruct the latest version and delta-encode against it.
        // Deltas are already compact (xpatch uses zstd internally).
        let prev = reconstruct_blob(sql, group_id, latest_version)?;
        xpatch::delta::encode(0, &prev, raw_data, true)
    };

    let stored_len = stored_data.len() as i64;

    if stored_data.len() <= MAX_BLOB_ROW_SIZE {
        // Fits in a single row
        sql.exec(
            "INSERT INTO blobs
                (blob_hash, group_id, version_in_group, is_keyframe, data, raw_size, stored_size)
             VALUES (?, ?, ?, ?, ?, ?, ?)",
            vec![
                SqlStorageValue::from(hash.to_string()),
                SqlStorageValue::from(group_id),
                SqlStorageValue::from(new_version),
                SqlStorageValue::from(if is_keyframe { 1i64 } else { 0i64 }),
                SqlStorageValue::Blob(stored_data),
                SqlStorageValue::from(raw_data.len() as i64),
                SqlStorageValue::from(stored_len),
            ],
        )?;
    } else {
        // Too large for one row even after compression — store empty
        // sentinel in blobs and actual data in blob_chunks.
        sql.exec(
            "INSERT INTO blobs
                (blob_hash, group_id, version_in_group, is_keyframe, data, raw_size, stored_size)
             VALUES (?, ?, ?, ?, ?, ?, ?)",
            vec![
                SqlStorageValue::from(hash.to_string()),
                SqlStorageValue::from(group_id),
                SqlStorageValue::from(new_version),
                SqlStorageValue::from(if is_keyframe { 1i64 } else { 0i64 }),
                SqlStorageValue::Blob(Vec::new()), // empty sentinel
                SqlStorageValue::from(raw_data.len() as i64),
                SqlStorageValue::from(stored_len),
            ],
        )?;
        for (i, chunk) in stored_data.chunks(MAX_BLOB_ROW_SIZE).enumerate() {
            sql.exec(
                "INSERT INTO blob_chunks
                    (group_id, version_in_group, chunk_index, data)
                 VALUES (?, ?, ?, ?)",
                vec![
                    SqlStorageValue::from(group_id),
                    SqlStorageValue::from(new_version),
                    SqlStorageValue::from(i as i64),
                    SqlStorageValue::Blob(chunk.to_vec()),
                ],
            )?;
        }
    }

    sql.exec(
        "UPDATE blob_groups SET latest_version = ? WHERE group_id = ?",
        vec![
            SqlStorageValue::from(new_version),
            SqlStorageValue::from(group_id),
        ],
    )?;

    Ok(())
}

/// Reconstruct a blob from its delta chain within a group.
pub fn reconstruct_blob(sql: &SqlStorage, group_id: i64, target_version: i64) -> Result<Vec<u8>> {
    #[derive(serde::Deserialize)]
    struct BlobRow {
        version_in_group: i64,
        #[serde(with = "serde_bytes")]
        data: Vec<u8>,
    }

    // Check if the target version is a raw keyframe (is_keyframe=2).
    // These are large blobs stored without compression — return directly.
    #[derive(serde::Deserialize)]
    struct KeyframeInfo {
        version_in_group: i64,
        is_keyframe: i64,
        #[serde(with = "serde_bytes")]
        data: Vec<u8>,
    }

    // Find nearest keyframe (is_keyframe IN (1,2))
    let mut keyframes: Vec<KeyframeInfo> = sql
        .exec(
            "SELECT version_in_group, is_keyframe, data FROM blobs
             WHERE group_id = ? AND version_in_group <= ? AND is_keyframe >= 1
             ORDER BY version_in_group DESC LIMIT 1",
            vec![
                SqlStorageValue::from(group_id),
                SqlStorageValue::from(target_version),
            ],
        )?
        .to_array()?;

    let keyframe = keyframes
        .pop()
        .ok_or_else(|| Error::RustError("no keyframe found in blob group".into()))?;

    let keyframe_version = keyframe.version_in_group;

    // Decompress/reassemble the keyframe based on type.
    let mut content = if keyframe.is_keyframe == 2 {
        // Raw keyframe (is_keyframe=2): stored uncompressed in blob_chunks.
        if keyframe.data.is_empty() {
            reassemble_chunks(sql, group_id, keyframe_version)?
        } else {
            keyframe.data
        }
    } else {
        // Normal keyframe (is_keyframe=1): zlib-compressed.
        let compressed = if keyframe.data.is_empty() {
            reassemble_chunks(sql, group_id, keyframe_version)?
        } else {
            keyframe.data
        };
        blob_zlib_decompress(&compressed)?
    };

    if keyframe_version < target_version {
        let cursor = sql.exec(
            "SELECT version_in_group, data FROM blobs
             WHERE group_id = ? AND version_in_group > ? AND version_in_group <= ?
             ORDER BY version_in_group ASC",
            vec![
                SqlStorageValue::from(group_id),
                SqlStorageValue::from(keyframe_version),
                SqlStorageValue::from(target_version),
            ],
        )?;

        for row in cursor.next::<BlobRow>() {
            let row = row?;
            // Reassemble chunked deltas (empty sentinel means data is in blob_chunks)
            let delta_data = if row.data.is_empty() {
                reassemble_chunks(sql, group_id, row.version_in_group)?
            } else {
                row.data
            };
            content = xpatch::delta::decode(&content, &delta_data).map_err(|e| {
                Error::RustError(format!(
                    "delta decode group {} v{}: {}",
                    group_id, row.version_in_group, e
                ))
            })?;
        }
    }

    Ok(content)
}

/// Update a ref, validating the old hash matches (basic fast-forward check).
pub fn update_ref(sql: &SqlStorage, name: &str, old_hash: &str, new_hash: &str) -> Result<()> {
    if new_hash == ZERO_HASH {
        // Delete ref
        sql.exec(
            "DELETE FROM refs WHERE name = ?",
            vec![SqlStorageValue::from(name.to_string())],
        )?;
        return Ok(());
    }

    if old_hash == ZERO_HASH {
        // Create new ref — verify it doesn't already exist
        sql.exec(
            "INSERT INTO refs (name, commit_hash) VALUES (?, ?)",
            vec![
                SqlStorageValue::from(name.to_string()),
                SqlStorageValue::from(new_hash.to_string()),
            ],
        )?;
    } else {
        // Update existing ref — validate old hash
        #[derive(serde::Deserialize)]
        struct RefRow {
            commit_hash: String,
        }
        let current: Vec<RefRow> = sql
            .exec(
                "SELECT commit_hash FROM refs WHERE name = ?",
                vec![SqlStorageValue::from(name.to_string())],
            )?
            .to_array()?;

        match current.first() {
            Some(r) if r.commit_hash != old_hash => {
                return Err(Error::RustError(format!(
                    "ref {}: expected {}, found {}",
                    name, old_hash, r.commit_hash
                )));
            }
            None => {
                return Err(Error::RustError(format!(
                    "ref {}: expected {} but ref does not exist",
                    name, old_hash
                )));
            }
            _ => {}
        }

        sql.exec(
            "UPDATE refs SET commit_hash = ? WHERE name = ?",
            vec![
                SqlStorageValue::from(new_hash.to_string()),
                SqlStorageValue::from(name.to_string()),
            ],
        )?;
    }

    Ok(())
}

// ---------------------------------------------------------------------------
// Path resolution: walk trees to map blob_hash → file path
// ---------------------------------------------------------------------------

/// Walk commit trees to build a mapping from blob hash to file path.
/// Checks `pack_trees` (from the current push) first, falls back to the DB.
pub fn resolve_blob_paths(
    sql: &SqlStorage,
    pack_trees: &HashMap<String, Vec<TreeEntry>>,
    root_tree_hashes: &[String],
) -> Result<HashMap<String, String>> {
    let mut blob_paths: HashMap<String, String> = HashMap::new();
    let mut visited_trees: HashMap<String, bool> = HashMap::new();

    for root_hash in root_tree_hashes {
        walk_tree(
            sql,
            pack_trees,
            root_hash,
            "",
            &mut blob_paths,
            &mut visited_trees,
        )?;
    }

    Ok(blob_paths)
}

fn walk_tree(
    sql: &SqlStorage,
    pack_trees: &HashMap<String, Vec<TreeEntry>>,
    tree_hash: &str,
    prefix: &str,
    blob_paths: &mut HashMap<String, String>,
    visited: &mut HashMap<String, bool>,
) -> Result<()> {
    if visited.contains_key(tree_hash) {
        return Ok(());
    }
    visited.insert(tree_hash.to_string(), true);

    // Try pack-local trees first, then DB
    let owned_entries;
    let entries = if let Some(entries) = pack_trees.get(tree_hash) {
        entries.as_slice()
    } else {
        owned_entries = load_tree_from_db(sql, tree_hash)?;
        owned_entries.as_slice()
    };

    for entry in entries {
        let full_path = if prefix.is_empty() {
            entry.name.clone()
        } else {
            format!("{}/{}", prefix, entry.name)
        };

        if entry.mode == 0o040000 {
            // Subdirectory: recurse
            walk_tree(
                sql,
                pack_trees,
                &entry.hash,
                &full_path,
                blob_paths,
                visited,
            )?;
        } else {
            // File (blob or symlink): record path if not already seen
            blob_paths.entry(entry.hash.clone()).or_insert(full_path);
        }
    }

    Ok(())
}

pub fn load_tree_from_db(sql: &SqlStorage, tree_hash: &str) -> Result<Vec<TreeEntry>> {
    #[derive(serde::Deserialize)]
    struct Row {
        mode: i64,
        name: String,
        entry_hash: String,
    }

    let rows: Vec<Row> = sql
        .exec(
            "SELECT mode, name, entry_hash FROM trees WHERE tree_hash = ?",
            vec![SqlStorageValue::from(tree_hash.to_string())],
        )?
        .to_array()?;

    Ok(rows
        .into_iter()
        .map(|r| TreeEntry {
            mode: r.mode as u32,
            name: r.name,
            hash: r.entry_hash,
        })
        .collect())
}

// ---------------------------------------------------------------------------
// Object collection for git-upload-pack (fetch / clone)
// ---------------------------------------------------------------------------

/// Collect all objects reachable from `wants` that are not reachable from
/// `haves`. Returns PackObjects ready for pack generation.
///
/// Walks the commit graph via BFS, collecting commits, trees, and blobs.
/// Blobs are reconstructed from xpatch delta chains.
pub fn collect_objects(
    sql: &SqlStorage,
    wants: &[String],
    haves: &HashSet<String>,
) -> Result<Vec<pack::PackObject>> {
    let mut objects: Vec<pack::PackObject> = Vec::new();
    let mut visited: HashSet<String> = HashSet::new();
    let mut commit_queue: VecDeque<String> = VecDeque::new();

    // Seed the walk with wanted commits
    for want in wants {
        if !haves.contains(want) {
            commit_queue.push_back(want.clone());
        }
    }

    // BFS over commit graph
    while let Some(commit_hash) = commit_queue.pop_front() {
        if visited.contains(&commit_hash) || haves.contains(&commit_hash) {
            continue;
        }
        visited.insert(commit_hash.clone());

        // Load commit metadata (for tree_hash and parents)
        #[derive(serde::Deserialize)]
        struct CommitRow {
            tree_hash: String,
        }
        let commits: Vec<CommitRow> = sql
            .exec(
                "SELECT tree_hash FROM commits WHERE hash = ?",
                vec![SqlStorageValue::from(commit_hash.clone())],
            )?
            .to_array()?;

        let commit = match commits.into_iter().next() {
            Some(c) => c,
            None => continue,
        };

        // Load raw bytes — byte-identical to what was pushed
        let raw_data = load_raw_object(sql, &commit_hash)?;
        let raw_data = match raw_data {
            Some(d) => d,
            None => continue,
        };

        objects.push(pack::PackObject {
            obj_type: pack::ObjectType::Commit,
            data: raw_data,
        });

        // Enqueue parents for traversal
        #[derive(serde::Deserialize)]
        struct ParentRow {
            parent_hash: String,
        }
        let parents: Vec<ParentRow> = sql
            .exec(
                "SELECT parent_hash FROM commit_parents
                 WHERE commit_hash = ? ORDER BY ordinal ASC",
                vec![SqlStorageValue::from(commit_hash.clone())],
            )?
            .to_array()?;

        for p in &parents {
            if !visited.contains(&p.parent_hash) && !haves.contains(&p.parent_hash) {
                commit_queue.push_back(p.parent_hash.clone());
            }
        }

        // Collect the tree (and all subtrees/blobs) for this commit
        collect_tree_objects(sql, &commit.tree_hash, &mut objects, &mut visited)?;
    }

    Ok(objects)
}

/// Recursively collect a tree and all its referenced blobs and subtrees.
fn collect_tree_objects(
    sql: &SqlStorage,
    tree_hash: &str,
    objects: &mut Vec<pack::PackObject>,
    visited: &mut HashSet<String>,
) -> Result<()> {
    if visited.contains(tree_hash) {
        return Ok(());
    }
    visited.insert(tree_hash.to_string());

    let entries = load_tree_from_db(sql, tree_hash)?;
    if entries.is_empty() {
        return Ok(()); // tree not found
    }

    // Load raw bytes — byte-identical to what was pushed
    if let Some(raw_data) = load_raw_object(sql, tree_hash)? {
        objects.push(pack::PackObject {
            obj_type: pack::ObjectType::Tree,
            data: raw_data,
        });
    }

    for entry in &entries {
        if entry.mode == 0o040000 {
            // Subtree
            collect_tree_objects(sql, &entry.hash, objects, visited)?;
        } else if !visited.contains(&entry.hash) {
            // Blob (or symlink)
            visited.insert(entry.hash.clone());
            let blob_data = load_blob_content(sql, &entry.hash)?;
            if let Some(data) = blob_data {
                objects.push(pack::PackObject {
                    obj_type: pack::ObjectType::Blob,
                    data,
                });
            }
        }
    }

    Ok(())
}

/// Load and reconstruct a blob's full content by hash.
fn load_blob_content(sql: &SqlStorage, blob_hash: &str) -> Result<Option<Vec<u8>>> {
    #[derive(serde::Deserialize)]
    struct BlobInfo {
        group_id: i64,
        version_in_group: i64,
    }

    let rows: Vec<BlobInfo> = sql
        .exec(
            "SELECT group_id, version_in_group FROM blobs WHERE blob_hash = ?",
            vec![SqlStorageValue::from(blob_hash.to_string())],
        )?
        .to_array()?;

    match rows.into_iter().next() {
        Some(info) => {
            let content = reconstruct_blob(sql, info.group_id, info.version_in_group)?;
            Ok(Some(content))
        }
        None => Ok(None),
    }
}

// NOTE: build_raw_commit / build_raw_tree / hex_decode were removed.
// We store raw object bytes in `raw_objects` during push and return them
// verbatim during fetch, so we never need to re-serialize commits or trees.

// ---------------------------------------------------------------------------
// Config helpers
// ---------------------------------------------------------------------------

pub fn get_config(sql: &SqlStorage, key: &str) -> Result<Option<String>> {
    #[derive(serde::Deserialize)]
    struct Row {
        value: String,
    }
    let rows: Vec<Row> = sql
        .exec(
            "SELECT value FROM config WHERE key = ?",
            vec![SqlStorageValue::from(key.to_string())],
        )?
        .to_array()?;
    Ok(rows.into_iter().next().map(|r| r.value))
}

pub fn set_config(sql: &SqlStorage, key: &str, value: &str) -> Result<()> {
    sql.exec(
        "INSERT OR REPLACE INTO config (key, value) VALUES (?, ?)",
        vec![
            SqlStorageValue::from(key.to_string()),
            SqlStorageValue::from(value.to_string()),
        ],
    )?;
    Ok(())
}

// ---------------------------------------------------------------------------
// FTS5 index rebuild (incremental when possible)
// ---------------------------------------------------------------------------

/// Rebuild the FTS5 code search index. If a previous indexed commit exists,
/// uses the diff engine to only update changed files. Otherwise does a full
/// tree walk.
pub fn rebuild_fts_index(sql: &SqlStorage, new_commit_hash: &str) -> Result<()> {
    let new_tree = match commit_tree_hash(sql, new_commit_hash)? {
        Some(h) => h,
        None => return Ok(()),
    };

    let prev_commit = get_config(sql, "fts_indexed_commit")?;

    if let Some(ref prev_hash) = prev_commit {
        if let Some(old_tree) = commit_tree_hash(sql, prev_hash)? {
            // Incremental rebuild: only touch changed files
            let changes = crate::diff::diff_trees(sql, Some(&old_tree), Some(&new_tree))?;

            for file in &changes {
                match file.status {
                    crate::diff::DiffStatus::Deleted => {
                        sql.exec(
                            "DELETE FROM fts_head WHERE path = ?",
                            vec![SqlStorageValue::from(file.path.clone())],
                        )?;
                    }
                    crate::diff::DiffStatus::Added => {
                        if let Some(hash) = &file.new_hash {
                            index_file_for_fts(sql, &file.path, hash)?;
                        }
                    }
                    crate::diff::DiffStatus::Modified => {
                        sql.exec(
                            "DELETE FROM fts_head WHERE path = ?",
                            vec![SqlStorageValue::from(file.path.clone())],
                        )?;
                        if let Some(hash) = &file.new_hash {
                            index_file_for_fts(sql, &file.path, hash)?;
                        }
                    }
                }
            }
        } else {
            // Previous commit lost — fall back to full rebuild
            full_fts_rebuild(sql, &new_tree)?;
        }
    } else {
        // First index — full rebuild
        full_fts_rebuild(sql, &new_tree)?;
    }

    set_config(sql, "fts_indexed_commit", new_commit_hash)?;
    Ok(())
}

/// Full rebuild: wipe the index and re-index every file in the tree.
fn full_fts_rebuild(sql: &SqlStorage, tree_hash: &str) -> Result<()> {
    sql.exec("DELETE FROM fts_head", None)?;

    let empty_pack_trees = HashMap::new();
    let mut visited = HashMap::new();
    let mut blob_paths = HashMap::new();

    walk_tree(
        sql,
        &empty_pack_trees,
        tree_hash,
        "",
        &mut blob_paths,
        &mut visited,
    )?;

    for (blob_hash, path) in &blob_paths {
        index_file_for_fts(sql, path, blob_hash)?;
    }

    Ok(())
}

/// Index a single file into fts_head. Skips binary, large (>1 MiB), and
/// non-UTF-8 files.
fn index_file_for_fts(sql: &SqlStorage, path: &str, blob_hash: &str) -> Result<()> {
    let content = match load_blob_content(sql, blob_hash)? {
        Some(c) => c,
        None => return Ok(()),
    };

    let check_len = content.len().min(8192);
    if content[..check_len].contains(&0) {
        return Ok(()); // binary
    }
    if content.len() > 1_048_576 {
        return Ok(()); // too large
    }

    let text = match std::str::from_utf8(&content) {
        Ok(t) => t,
        Err(_) => return Ok(()), // not UTF-8
    };

    sql.exec(
        "INSERT INTO fts_head (path, content) VALUES (?, ?)",
        vec![
            SqlStorageValue::from(path.to_string()),
            SqlStorageValue::from(text.to_string()),
        ],
    )?;
    Ok(())
}

/// Get the tree hash for a commit.
fn commit_tree_hash(sql: &SqlStorage, commit_hash: &str) -> Result<Option<String>> {
    #[derive(serde::Deserialize)]
    struct Row {
        tree_hash: String,
    }
    let rows: Vec<Row> = sql
        .exec(
            "SELECT tree_hash FROM commits WHERE hash = ?",
            vec![SqlStorageValue::from(commit_hash.to_string())],
        )?
        .to_array()?;
    Ok(rows.into_iter().next().map(|r| r.tree_hash))
}

// ---------------------------------------------------------------------------
// Search
// ---------------------------------------------------------------------------

pub struct CodeSearchResult {
    pub path: String,
    pub matches: Vec<LineMatch>,
}

pub struct LineMatch {
    pub line_number: usize,
    pub line_text: String,
}

pub struct CommitSearchResult {
    pub hash: String,
    pub message: String,
    pub author: String,
    pub commit_time: i64,
}

/// Search code in the FTS index. Auto-detects whether to use FTS5 MATCH
/// (for word queries) or INSTR (for literal/symbol queries).
/// Returns all matching lines with line numbers, not just one snippet per file.
pub fn search_code(
    sql: &SqlStorage,
    query: &str,
    path_filter: Option<&str>,
    ext_filter: Option<&str>,
    limit: usize,
) -> Result<Vec<CodeSearchResult>> {
    let (search_term, literal) = if let Some(stripped) = query.strip_prefix("lit:") {
        (stripped, true)
    } else {
        (query, needs_literal_search(query))
    };

    let files = if literal {
        search_files_literal(sql, search_term, path_filter, ext_filter, limit)?
    } else {
        search_files_fts(sql, search_term, path_filter, ext_filter, limit)?
    };

    // Scan each file for matching lines
    let mut results = Vec::new();
    let mut total_matches = 0usize;
    let max_matches = 500;
    let term_lower = search_term.to_lowercase();

    for (path, content) in files {
        if total_matches >= max_matches {
            break;
        }

        let mut line_matches = Vec::new();
        for (i, line) in content.lines().enumerate() {
            let found = if literal {
                line.contains(search_term)
            } else {
                line.to_lowercase().contains(&term_lower)
            };

            if found {
                line_matches.push(LineMatch {
                    line_number: i + 1,
                    line_text: line.to_string(),
                });
                total_matches += 1;
                if total_matches >= max_matches {
                    break;
                }
            }
        }

        if !line_matches.is_empty() {
            results.push(CodeSearchResult {
                path,
                matches: line_matches,
            });
        }
    }

    Ok(results)
}

/// Search commit messages via FTS5.
pub fn search_commits(
    sql: &SqlStorage,
    query: &str,
    limit: usize,
) -> Result<Vec<CommitSearchResult>> {
    #[derive(serde::Deserialize)]
    struct Row {
        hash: String,
        message: String,
        author: String,
    }

    let rows: Vec<Row> = sql
        .exec(
            "SELECT hash, message, author FROM fts_commits
             WHERE fts_commits MATCH ?
             ORDER BY rank
             LIMIT ?",
            vec![
                SqlStorageValue::from(query.to_string()),
                SqlStorageValue::from(limit as i64),
            ],
        )?
        .to_array()?;

    let mut results = Vec::new();
    for row in rows {
        // Look up commit_time from the commits table
        #[derive(serde::Deserialize)]
        struct TimeRow {
            commit_time: i64,
        }
        let time_rows: Vec<TimeRow> = sql
            .exec(
                "SELECT commit_time FROM commits WHERE hash = ?",
                vec![SqlStorageValue::from(row.hash.clone())],
            )?
            .to_array()?;

        let commit_time = time_rows
            .into_iter()
            .next()
            .map(|r| r.commit_time)
            .unwrap_or(0);

        results.push(CommitSearchResult {
            hash: row.hash,
            message: row.message,
            author: row.author,
            commit_time,
        });
    }

    Ok(results)
}

/// Detect if a query needs literal (INSTR) search instead of FTS5 MATCH.
/// Triggers on any character that would break or confuse FTS5 tokenization:
/// dots, underscores, parens, colons, angle brackets, etc.
fn needs_literal_search(query: &str) -> bool {
    query
        .bytes()
        .any(|b| !matches!(b, b'a'..=b'z' | b'A'..=b'Z' | b'0'..=b'9' | b' ' | b'"' | b'*'))
}

/// FTS5 MATCH search — returns (path, content) pairs for files that match.
fn search_files_fts(
    sql: &SqlStorage,
    query: &str,
    path_filter: Option<&str>,
    ext_filter: Option<&str>,
    limit: usize,
) -> Result<Vec<(String, String)>> {
    // Use {content} column filter so we don't match on filenames,
    // unless the user already specified a column (e.g. content:foo or path:bar).
    let has_column_spec = query.contains("content:") || query.contains("path:");
    let fts_match = if has_column_spec {
        query.to_string()
    } else {
        format!("{{content}}: {}", query)
    };
    let mut where_parts = vec!["fts_head MATCH ?".to_string()];
    let mut params: Vec<SqlStorageValue> = vec![SqlStorageValue::from(fts_match)];

    if let Some(path) = path_filter {
        where_parts.push("path LIKE ?".to_string());
        params.push(SqlStorageValue::from(format!("{}%", path)));
    }
    if let Some(ext) = ext_filter {
        where_parts.push("path LIKE ?".to_string());
        params.push(SqlStorageValue::from(format!("%.{}", ext)));
    }

    params.push(SqlStorageValue::from(limit as i64));

    let sql_str = format!(
        "SELECT path, content FROM fts_head WHERE {} ORDER BY rank LIMIT ?",
        where_parts.join(" AND "),
    );

    #[derive(serde::Deserialize)]
    struct Row {
        path: String,
        content: String,
    }

    let rows: Vec<Row> = sql.exec(&sql_str, params)?.to_array()?;
    Ok(rows.into_iter().map(|r| (r.path, r.content)).collect())
}

/// Literal substring search via INSTR — full table scan, but bounded by repo size.
fn search_files_literal(
    sql: &SqlStorage,
    query: &str,
    path_filter: Option<&str>,
    ext_filter: Option<&str>,
    limit: usize,
) -> Result<Vec<(String, String)>> {
    let mut where_parts = vec!["INSTR(content, ?) > 0".to_string()];
    let mut params: Vec<SqlStorageValue> = vec![SqlStorageValue::from(query.to_string())];

    if let Some(path) = path_filter {
        where_parts.push("path LIKE ?".to_string());
        params.push(SqlStorageValue::from(format!("{}%", path)));
    }
    if let Some(ext) = ext_filter {
        where_parts.push("path LIKE ?".to_string());
        params.push(SqlStorageValue::from(format!("%.{}", ext)));
    }

    params.push(SqlStorageValue::from(limit as i64));

    let sql_str = format!(
        "SELECT path, content FROM fts_head WHERE {} LIMIT ?",
        where_parts.join(" AND "),
    );

    #[derive(serde::Deserialize)]
    struct Row {
        path: String,
        content: String,
    }

    let rows: Vec<Row> = sql.exec(&sql_str, params)?.to_array()?;
    Ok(rows.into_iter().map(|r| (r.path, r.content)).collect())
}

// ---------------------------------------------------------------------------
// Blob compression helpers
// ---------------------------------------------------------------------------

/// Zlib-compress data (used for keyframe storage).
fn blob_zlib_compress(data: &[u8]) -> Vec<u8> {
    use flate2::write::ZlibEncoder;
    use flate2::Compression;
    use std::io::Write;

    let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
    encoder.write_all(data).expect("zlib compress write");
    encoder.finish().expect("zlib compress finish")
}

/// Zlib-decompress data (used when reading keyframes).
fn blob_zlib_decompress(data: &[u8]) -> Result<Vec<u8>> {
    use flate2::read::ZlibDecoder;
    use std::io::Read;

    let mut decoder = ZlibDecoder::new(data);
    let mut output = Vec::new();
    decoder
        .read_to_end(&mut output)
        .map_err(|e| Error::RustError(format!("blob zlib decompress: {}", e)))?;
    Ok(output)
}

/// Reassemble a chunked blob from the blob_chunks table.
fn reassemble_chunks(sql: &SqlStorage, group_id: i64, version: i64) -> Result<Vec<u8>> {
    #[derive(serde::Deserialize)]
    struct ChunkRow {
        #[serde(with = "serde_bytes")]
        data: Vec<u8>,
    }

    let cursor = sql.exec(
        "SELECT data FROM blob_chunks
         WHERE group_id = ? AND version_in_group = ?
         ORDER BY chunk_index ASC",
        vec![
            SqlStorageValue::from(group_id),
            SqlStorageValue::from(version),
        ],
    )?;

    let mut result = Vec::new();
    for row in cursor.next::<ChunkRow>() {
        let row = row?;
        result.extend_from_slice(&row.data);
    }

    Ok(result)
}