branch: main
build_lock.rs
7308 bytesRaw
//! Build lock using a `.tmp` staging directory with mtime-based heartbeat.
//!
//! When multiple `worker-build` processes run concurrently (e.g. wrangler
//! triggers a rebuild while the previous one is still running), they can
//! stomp on each other's files in `build/`. This module provides a lock
//! mechanism:
//!
//! 1. All build output goes into `build/.tmp/` instead of `build/` directly.
//! 2. A background thread bumps `.tmp`'s mtime every second as a heartbeat.
//! 3. On startup, if `.tmp` exists with mtime < 5s ago, we know another build
//! is active and wait for it to go stale before proceeding.
//! 4. On completion, entries are moved from `.tmp/` into `build/`, and `.tmp/`
//! is removed.
use anyhow::{bail, Context, Result};
use filetime::FileTime;
use log::warn;
use std::fs;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::{Duration, SystemTime};
/// How often the heartbeat thread touches `.tmp` (seconds).
const HEARTBEAT_INTERVAL: Duration = Duration::from_secs(1);
/// If `.tmp` mtime is older than this, it's considered stale / abandoned.
const STALE_THRESHOLD: Duration = Duration::from_secs(5);
/// Maximum time to wait for a concurrent build before giving up.
const MAX_WAIT: Duration = Duration::from_secs(300);
pub struct BuildLock {
/// The real output directory, e.g. `<crate>/build`.
out_dir: PathBuf,
/// The staging directory: `<crate>/build/.tmp`.
tmp_dir: PathBuf,
/// Signal to stop the heartbeat thread.
stop: Arc<AtomicBool>,
/// Join handle for the heartbeat thread.
heartbeat_handle: Option<thread::JoinHandle<()>>,
}
impl BuildLock {
/// Acquire the build lock. This will:
/// - Wait for any active concurrent build to finish (stale `.tmp`)
/// - Clean up any stale `.tmp` into `.oldtmp` and remove it
/// - Create a fresh `.tmp` directory
/// - Start the heartbeat thread
///
/// Returns the path to the `.tmp` staging directory for building into.
pub fn acquire(out_dir: &Path) -> Result<Self> {
let tmp_dir = out_dir.join(".tmp");
let oldtmp_dir = out_dir.join(".oldtmp");
// Ensure the parent out_dir exists
fs::create_dir_all(out_dir)
.with_context(|| format!("Failed to create output directory {}", out_dir.display()))?;
// Wait for any active build to finish
Self::wait_for_stale(&tmp_dir)?;
// Clean up stale .tmp → .oldtmp → delete
if tmp_dir.exists() {
// Move to .oldtmp so new .tmp can be created immediately
if oldtmp_dir.exists() {
if let Err(e) = fs::remove_dir_all(&oldtmp_dir) {
warn!("Failed to remove old .oldtmp directory: {e}");
}
}
fs::rename(&tmp_dir, &oldtmp_dir)
.or_else(|_| fs::remove_dir_all(&tmp_dir))
.context("Failed to clean up stale .tmp directory")?;
if let Err(e) = fs::remove_dir_all(&oldtmp_dir) {
warn!("Failed to remove .oldtmp directory: {e}");
}
}
// Create fresh .tmp
fs::create_dir_all(&tmp_dir)
.with_context(|| format!("Failed to create staging directory {}", tmp_dir.display()))?;
// Start heartbeat
let stop = Arc::new(AtomicBool::new(false));
let heartbeat_handle = {
let stop = Arc::clone(&stop);
let tmp_dir = tmp_dir.clone();
thread::spawn(move || {
while !stop.load(Ordering::Relaxed) {
let now = FileTime::now();
if let Err(e) = filetime::set_file_mtime(&tmp_dir, now) {
warn!("Failed to update heartbeat mtime: {e}");
}
thread::sleep(HEARTBEAT_INTERVAL);
}
})
};
Ok(Self {
out_dir: out_dir.to_path_buf(),
tmp_dir,
stop,
heartbeat_handle: Some(heartbeat_handle),
})
}
/// Returns the staging directory path to build into.
pub fn staging_dir(&self) -> &Path {
&self.tmp_dir
}
/// Finish the build: stop heartbeat, move entries from `.tmp/` into the
/// parent `build/` directory, and remove `.tmp/`.
pub fn finish(mut self) -> Result<()> {
// Stop heartbeat
self.stop_heartbeat();
// Move each entry from .tmp/ into out_dir/
for entry in fs::read_dir(&self.tmp_dir).with_context(|| {
format!(
"Failed to read staging directory {}",
self.tmp_dir.display()
)
})? {
let entry = entry?;
let name = entry.file_name();
let dest = self.out_dir.join(&name);
// Remove existing entry at destination
if dest.is_dir() {
if let Err(e) = fs::remove_dir_all(&dest) {
warn!(
"Failed to remove existing directory {}: {e}",
dest.display()
);
}
} else if dest.exists() {
if let Err(e) = fs::remove_file(&dest) {
warn!("Failed to remove existing file {}: {e}", dest.display());
}
}
fs::rename(entry.path(), &dest).with_context(|| {
format!(
"Failed to move {} from staging to output",
name.to_string_lossy()
)
})?;
}
// Remove the now-empty .tmp directory
if let Err(e) = fs::remove_dir(&self.tmp_dir) {
warn!("Failed to remove .tmp staging directory: {e}");
}
Ok(())
}
/// Wait until `.tmp` either doesn't exist or is stale (mtime > threshold).
fn wait_for_stale(tmp_dir: &Path) -> Result<()> {
let deadline = SystemTime::now() + MAX_WAIT;
loop {
if !tmp_dir.exists() {
return Ok(());
}
let metadata = match fs::metadata(tmp_dir) {
Ok(m) => m,
Err(_) => return Ok(()), // disappeared
};
let mtime = metadata.modified().unwrap_or(SystemTime::UNIX_EPOCH);
let age = SystemTime::now()
.duration_since(mtime)
.unwrap_or(Duration::ZERO);
if age >= STALE_THRESHOLD {
return Ok(());
}
if SystemTime::now() >= deadline {
bail!(
"Timed out waiting for concurrent worker-build to finish \
(build/.tmp is still being updated). If this is stale, \
remove the build/.tmp directory manually."
);
}
thread::sleep(Duration::from_millis(500));
}
}
fn stop_heartbeat(&mut self) {
self.stop.store(true, Ordering::Relaxed);
if let Some(handle) = self.heartbeat_handle.take() {
let _ = handle.join();
}
}
}
impl Drop for BuildLock {
fn drop(&mut self) {
// Ensure heartbeat is stopped even on error/panic
self.stop_heartbeat();
}
}