//! Build lock using a `.tmp` staging directory with mtime-based heartbeat. //! //! When multiple `worker-build` processes run concurrently (e.g. wrangler //! triggers a rebuild while the previous one is still running), they can //! stomp on each other's files in `build/`. This module provides a lock //! mechanism: //! //! 1. All build output goes into `build/.tmp/` instead of `build/` directly. //! 2. A background thread bumps `.tmp`'s mtime every second as a heartbeat. //! 3. On startup, if `.tmp` exists with mtime < 5s ago, we know another build //! is active and wait for it to go stale before proceeding. //! 4. On completion, entries are moved from `.tmp/` into `build/`, and `.tmp/` //! is removed. use anyhow::{bail, Context, Result}; use filetime::FileTime; use log::warn; use std::fs; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::{Duration, SystemTime}; /// How often the heartbeat thread touches `.tmp` (seconds). const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1); /// If `.tmp` mtime is older than this, it's considered stale / abandoned. const STALE_THRESHOLD: Duration = Duration::from_secs(5); /// Maximum time to wait for a concurrent build before giving up. const MAX_WAIT: Duration = Duration::from_secs(300); pub struct BuildLock { /// The real output directory, e.g. `/build`. out_dir: PathBuf, /// The staging directory: `/build/.tmp`. tmp_dir: PathBuf, /// Signal to stop the heartbeat thread. stop: Arc, /// Join handle for the heartbeat thread. heartbeat_handle: Option>, } impl BuildLock { /// Acquire the build lock. This will: /// - Wait for any active concurrent build to finish (stale `.tmp`) /// - Clean up any stale `.tmp` into `.oldtmp` and remove it /// - Create a fresh `.tmp` directory /// - Start the heartbeat thread /// /// Returns the path to the `.tmp` staging directory for building into. pub fn acquire(out_dir: &Path) -> Result { let tmp_dir = out_dir.join(".tmp"); let oldtmp_dir = out_dir.join(".oldtmp"); // Ensure the parent out_dir exists fs::create_dir_all(out_dir) .with_context(|| format!("Failed to create output directory {}", out_dir.display()))?; // Wait for any active build to finish Self::wait_for_stale(&tmp_dir)?; // Clean up stale .tmp → .oldtmp → delete if tmp_dir.exists() { // Move to .oldtmp so new .tmp can be created immediately if oldtmp_dir.exists() { if let Err(e) = fs::remove_dir_all(&oldtmp_dir) { warn!("Failed to remove old .oldtmp directory: {e}"); } } fs::rename(&tmp_dir, &oldtmp_dir) .or_else(|_| fs::remove_dir_all(&tmp_dir)) .context("Failed to clean up stale .tmp directory")?; if let Err(e) = fs::remove_dir_all(&oldtmp_dir) { warn!("Failed to remove .oldtmp directory: {e}"); } } // Create fresh .tmp fs::create_dir_all(&tmp_dir) .with_context(|| format!("Failed to create staging directory {}", tmp_dir.display()))?; // Start heartbeat let stop = Arc::new(AtomicBool::new(false)); let heartbeat_handle = { let stop = Arc::clone(&stop); let tmp_dir = tmp_dir.clone(); thread::spawn(move || { while !stop.load(Ordering::Relaxed) { let now = FileTime::now(); if let Err(e) = filetime::set_file_mtime(&tmp_dir, now) { warn!("Failed to update heartbeat mtime: {e}"); } thread::sleep(HEARTBEAT_INTERVAL); } }) }; Ok(Self { out_dir: out_dir.to_path_buf(), tmp_dir, stop, heartbeat_handle: Some(heartbeat_handle), }) } /// Returns the staging directory path to build into. pub fn staging_dir(&self) -> &Path { &self.tmp_dir } /// Finish the build: stop heartbeat, move entries from `.tmp/` into the /// parent `build/` directory, and remove `.tmp/`. pub fn finish(mut self) -> Result<()> { // Stop heartbeat self.stop_heartbeat(); // Move each entry from .tmp/ into out_dir/ for entry in fs::read_dir(&self.tmp_dir).with_context(|| { format!( "Failed to read staging directory {}", self.tmp_dir.display() ) })? { let entry = entry?; let name = entry.file_name(); let dest = self.out_dir.join(&name); // Remove existing entry at destination if dest.is_dir() { if let Err(e) = fs::remove_dir_all(&dest) { warn!( "Failed to remove existing directory {}: {e}", dest.display() ); } } else if dest.exists() { if let Err(e) = fs::remove_file(&dest) { warn!("Failed to remove existing file {}: {e}", dest.display()); } } fs::rename(entry.path(), &dest).with_context(|| { format!( "Failed to move {} from staging to output", name.to_string_lossy() ) })?; } // Remove the now-empty .tmp directory if let Err(e) = fs::remove_dir(&self.tmp_dir) { warn!("Failed to remove .tmp staging directory: {e}"); } Ok(()) } /// Wait until `.tmp` either doesn't exist or is stale (mtime > threshold). fn wait_for_stale(tmp_dir: &Path) -> Result<()> { let deadline = SystemTime::now() + MAX_WAIT; loop { if !tmp_dir.exists() { return Ok(()); } let metadata = match fs::metadata(tmp_dir) { Ok(m) => m, Err(_) => return Ok(()), // disappeared }; let mtime = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH); let age = SystemTime::now() .duration_since(mtime) .unwrap_or(Duration::ZERO); if age >= STALE_THRESHOLD { return Ok(()); } if SystemTime::now() >= deadline { bail!( "Timed out waiting for concurrent worker-build to finish \ (build/.tmp is still being updated). If this is stale, \ remove the build/.tmp directory manually." ); } thread::sleep(Duration::from_millis(500)); } } fn stop_heartbeat(&mut self) { self.stop.store(true, Ordering::Relaxed); if let Some(handle) = self.heartbeat_handle.take() { let _ = handle.join(); } } } impl Drop for BuildLock { fn drop(&mut self) { // Ensure heartbeat is stopped even on error/panic self.stop_heartbeat(); } }