use serde::Deserialize; use worker::{ durable_object, wasm_bindgen, DurableObject, Env, Request, Response, Result, SqlStorage, SqlStorageValue, State, }; /// A Durable Object that demonstrates SQL cursor iterator methods. /// /// This example creates a table with sample data and provides endpoints /// to test both the next() and raw() iterator methods. #[durable_object] pub struct SqlIterator { sql: SqlStorage, } #[derive(Deserialize)] struct Product { id: i32, name: String, price: f64, in_stock: i32, } #[derive(Deserialize)] struct BadProduct { id: String, // This will cause deserialization to fail since id is actually an integer name: String, price: f64, in_stock: i32, } #[derive(Debug)] struct BlobData { id: i32, name: String, data: Vec, } impl BlobData { fn from_raw_row(row: &[SqlStorageValue]) -> Option { if row.len() != 3 { return None; } let id = match &row[0] { SqlStorageValue::Integer(i) => *i as i32, _ => return None, }; let name = match &row[1] { SqlStorageValue::String(s) => s.clone(), _ => return None, }; let data = match &row[2] { SqlStorageValue::Blob(bytes) => bytes.clone(), _ => return None, }; Some(BlobData { id, name, data }) } } impl DurableObject for SqlIterator { fn new(state: State, _env: Env) -> Self { let sql = state.storage().sql(); // Create table and seed with test data sql.exec( "CREATE TABLE IF NOT EXISTS products(id INTEGER PRIMARY KEY, name TEXT, price REAL, in_stock INTEGER);", None, ).expect("create table"); sql.exec( "CREATE TABLE IF NOT EXISTS blob_data(id INTEGER PRIMARY KEY, name TEXT, data BLOB);", None, ) .expect("create blob table"); // Check if we need to seed data let count: Vec = sql .exec("SELECT COUNT(*) as count FROM products;", None) .expect("count query") .to_array() .expect("count result"); if count .first() .and_then(|v| v.get("count")) .and_then(serde_json::Value::as_i64) .unwrap_or(0) == 0 { // Seed with test data let products = vec![ ("Laptop", 999.99, true), ("Mouse", 29.99, true), ("Keyboard", 79.99, false), ("Monitor", 299.99, true), ("Headphones", 149.99, false), ]; for (name, price, in_stock) in products { sql.exec( "INSERT INTO products(name, price, in_stock) VALUES (?, ?, ?);", vec![name.into(), price.into(), i32::from(in_stock).into()], ) .expect("insert product"); } } let blob_count: Vec = sql .exec("SELECT COUNT(*) as count FROM blob_data;", None) .expect("blob count query") .to_array() .expect("blob count result"); if blob_count .first() .and_then(|v| v.get("count")) .and_then(serde_json::Value::as_i64) .unwrap_or(0) == 0 { let blob_test_data = vec![ ("binary_data", vec![0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE]), ("empty_blob", vec![]), ("text_as_blob", "Hello, World!".as_bytes().to_vec()), ( "large_blob", (0u8..=255).cycle().take(1000).collect::>(), ), ]; for (name, data) in blob_test_data { sql.exec( "INSERT INTO blob_data(name, data) VALUES (?, ?);", vec![name.into(), SqlStorageValue::Blob(data)], ) .expect("insert blob data"); } } Self { sql } } async fn fetch(&self, req: Request) -> Result { let url = req.url()?; let path = url.path(); match path { "/next" => self.handle_next(), "/raw" => self.handle_raw(), "/next-invalid" => self.handle_next_invalid(), "/blob-next" => self.handle_blob_next(), "/blob-raw" => self.handle_blob_raw(), "/blob-roundtrip" => self.handle_blob_roundtrip(), _ => Response::ok("SQL Iterator Test - try /next, /raw, /next-invalid, /blob-next, /blob-raw, or /blob-roundtrip endpoints"), } } } impl SqlIterator { fn handle_next(&self) -> Result { let cursor = self.sql.exec("SELECT * FROM products ORDER BY id;", None)?; let mut results = Vec::new(); let iterator = cursor.next::(); for result in iterator { match result { Ok(product) => { results.push(format!( "Product {}: {} - ${:.2} (in stock: {})", product.id, product.name, product.price, product.in_stock != 0 )); } Err(e) => { results.push(format!("Error deserializing row: {e}")); } } } let response_body = format!("next() iterator results:\n{}", results.join("\n")); Response::ok(response_body) } fn handle_raw(&self) -> Result { let cursor = self.sql.exec("SELECT * FROM products ORDER BY id;", None)?; let mut results = Vec::new(); let column_names = cursor.column_names(); results.push(format!("Columns: {}", column_names.join(", "))); let iterator = cursor.raw(); for result in iterator { match result { Ok(row) => { let row_str: Vec = row.iter().map(|v| format!("{v:?}")).collect(); results.push(format!("Row: [{}]", row_str.join(", "))); } Err(e) => { results.push(format!("Error reading row: {e}")); } } } let response_body = format!("raw() iterator results:\n{}", results.join("\n")); Response::ok(response_body) } fn handle_next_invalid(&self) -> Result { let cursor = self.sql.exec("SELECT * FROM products ORDER BY id;", None)?; let mut results = Vec::new(); let iterator = cursor.next::(); for result in iterator { match result { Ok(product) => { results.push(format!( "BadProduct {}: {} - ${:.2} (in stock: {})", product.id, product.name, product.price, product.in_stock != 0 )); } Err(e) => { results.push(format!("Error deserializing row: {e}")); } } } let response_body = format!("next-invalid() iterator results:\n{}", results.join("\n")); Response::ok(response_body) } fn handle_blob_next(&self) -> Result { let cursor = self .sql .exec("SELECT * FROM blob_data ORDER BY id;", None)?; let mut results = Vec::new(); let iterator = cursor.raw(); for result in iterator { match result { Ok(row) => { if let Some(blob_data) = BlobData::from_raw_row(&row) { let data_preview = if blob_data.data.len() <= 10 { format!("{:?}", blob_data.data) } else { format!( "{:?}...[{} bytes total]", &blob_data.data[..10], blob_data.data.len() ) }; results.push(format!( "BlobData {}: {} - data: {}", blob_data.id, blob_data.name, data_preview )); } else { results.push("Error: Failed to parse blob row".to_string()); } } Err(e) => { results.push(format!("Error reading blob row: {e}")); } } } let response_body = format!("blob-next() iterator results:\n{}", results.join("\n")); Response::ok(response_body) } fn handle_blob_raw(&self) -> Result { let cursor = self .sql .exec("SELECT * FROM blob_data ORDER BY id;", None)?; let mut results = Vec::new(); let column_names = cursor.column_names(); results.push(format!("Columns: {}", column_names.join(", "))); let iterator = cursor.raw(); for result in iterator { match result { Ok(row) => { let mut row_str = Vec::new(); for (i, value) in row.iter().enumerate() { if i == 2 { // 'data' column is index 2 (id=0, name=1, data=2) match value { SqlStorageValue::Blob(bytes) => { if bytes.len() <= 10 { row_str.push(format!("Blob({bytes:?})")); } else { row_str.push(format!( "Blob({:?}...[{} bytes])", &bytes[..10], bytes.len() )); } } _ => row_str.push(format!("{value:?}")), } } else { row_str.push(format!("{value:?}")); } } results.push(format!("Row: [{}]", row_str.join(", "))); } Err(e) => { results.push(format!("Error reading blob row: {e}")); } } } let response_body = format!("blob-raw() iterator results:\n{}", results.join("\n")); Response::ok(response_body) } fn handle_blob_roundtrip(&self) -> Result { // Test data roundtrip: insert a BLOB and immediately read it back let test_data = vec![0xDE, 0xAD, 0xBE, 0xEF, 0x00, 0xFF]; let test_name = "roundtrip_test"; // Insert test data self.sql.exec( "INSERT INTO blob_data(name, data) VALUES (?, ?);", vec![test_name.into(), SqlStorageValue::Blob(test_data.clone())], )?; // Read it back using both methods (raw iterator approach for both) let cursor_next = self.sql.exec( "SELECT * FROM blob_data WHERE name = ? ORDER BY id DESC LIMIT 1;", vec![test_name.into()], )?; let cursor_raw = self.sql.exec( "SELECT * FROM blob_data WHERE name = ? ORDER BY id DESC LIMIT 1;", vec![test_name.into()], )?; let mut results = Vec::new(); results.push(format!("Original data: {test_data:?}")); // Test "next()" style result by converting raw data to BlobData struct let next_raw_iterator = cursor_next.raw(); for result in next_raw_iterator { match result { Ok(row) => { if let Some(blob_data) = BlobData::from_raw_row(&row) { let matches = blob_data.data == test_data; results.push(format!( "next() result: {:?}, matches_original: {}", blob_data.data, matches )); } else { results.push("next() error: Failed to parse blob row".to_string()); } } Err(e) => { results.push(format!("next() error: {e}")); } } } // Test raw iterator let raw_iterator = cursor_raw.raw(); for result in raw_iterator { match result { Ok(row) => { if let Some(SqlStorageValue::Blob(data)) = row.get(2) { let matches = data == &test_data; results.push(format!( "raw() result: {data:?}, matches_original: {matches}" )); } else { results.push("raw() error: data column is not a blob".to_string()); } } Err(e) => { results.push(format!("raw() error: {e}")); } } } // Clean up test data self.sql.exec( "DELETE FROM blob_data WHERE name = ?;", vec![test_name.into()], )?; let response_body = format!("blob-roundtrip test results:\n{}", results.join("\n")); Response::ok(response_body) } } #[worker::send] /// Route handler for the SQL iterator test Durable Object. pub async fn handle_sql_iterator( req: Request, env: Env, _data: super::SomeSharedData, ) -> Result { let uri = req.url()?; let mut segments = uri.path_segments().unwrap(); // skip "sql-iterator" let _ = segments.next(); // Get name and remaining path let name = segments.next().unwrap_or("default"); let remaining_path: Vec<&str> = segments.collect(); let path = if remaining_path.is_empty() { "/" } else { &format!("/{}", remaining_path.join("/")) }; let namespace = env.durable_object("SQL_ITERATOR")?; let stub = namespace.id_from_name(name)?.get_stub()?; // Forward the request path to the DO let new_url = format!("https://fake-host{path}"); stub.fetch_with_str(&new_url).await }