branch: main
r2.rs
10154 bytesRaw
use futures_util::StreamExt;
use std::{
collections::HashMap,
convert::TryFrom,
sync::atomic::{AtomicBool, Ordering},
};
use worker::{
Bucket, Conditional, Data, Date, Env, FixedLengthStream, HttpMetadata, Include, Request,
Response, Result,
};
use crate::SomeSharedData;
static SEEDED: AtomicBool = AtomicBool::new(false);
pub async fn seed_bucket(bucket: &Bucket) -> Result<()> {
if SEEDED.load(Ordering::Acquire) {
return Ok(());
}
SEEDED.store(true, Ordering::Release);
bucket.put("no-props", "text".to_string()).execute().await?;
bucket
.put("no-props-no-body", Data::Empty)
.execute()
.await?;
put_full_properties("with-props", bucket).await?;
Ok(())
}
#[worker::send]
pub async fn list_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("EMPTY_BUCKET")?;
let objects = bucket.list().execute().await?;
assert_eq!(objects.objects().len(), 0);
assert!(!objects.truncated());
assert_eq!(objects.cursor(), None);
Response::ok("ok")
}
#[worker::send]
pub async fn list(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("SEEDED_BUCKET")?;
seed_bucket(&bucket).await?;
let objects = bucket.list().execute().await?;
assert_eq!(objects.objects().len(), 3);
assert!(!objects.truncated());
assert_eq!(objects.cursor(), None);
let objects = bucket.list().limit(1).execute().await?;
assert_eq!(objects.objects().len(), 1);
assert!(objects.truncated());
let cursor = objects.cursor().unwrap();
let objects_2 = bucket.list().cursor(cursor).execute().await?;
assert_eq!(objects_2.objects().len(), 2);
assert!(!objects_2.truncated());
let with_prefix = bucket.list().prefix("no-").execute().await?;
assert_eq!(with_prefix.objects().len(), 2);
assert!(!with_prefix.truncated());
let objects = bucket
.list()
.include(vec![Include::CustomMetadata])
.execute()
.await?;
let count = objects
.objects()
.into_iter()
.filter(|obj| {
obj.custom_metadata()
.ok()
.is_some_and(|map| !map.is_empty())
})
.count();
assert_eq!(count, 1);
let objects = bucket
.list()
.include(vec![Include::HttpMetadata])
.execute()
.await?;
let count = objects
.objects()
.into_iter()
.filter(|obj| obj.http_metadata().content_type.is_some())
.count();
assert_eq!(count, 1);
Response::ok("ok")
}
#[worker::send]
pub async fn get_empty(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("EMPTY_BUCKET")?;
let object = bucket.get("doesnt-exist").execute().await?;
assert!(object.is_none());
// Ensure all properties are being properly read with no errors.
let object = bucket
.get("doesnt-exist-with-properties")
.only_if(Conditional {
etag_does_not_match: Some("a".into()),
etag_matches: Some("b".into()),
uploaded_after: Some(Date::now()),
uploaded_before: Some(Date::now()),
})
.execute()
.await?;
assert!(object.is_none());
Response::ok("ok")
}
#[worker::send]
pub async fn get(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("SEEDED_BUCKET")?;
seed_bucket(&bucket).await?;
let item = bucket.get("no-props").execute().await?.unwrap();
let item_body = item.body().unwrap();
assert_eq!(item_body.text().await?, "text");
let (http_metadata, custom_metadata) = dummy_properties();
let item = bucket.get("with-props").execute().await?.unwrap();
let item_body = item.body().unwrap();
assert_eq!(item_body.text().await?, "example");
let uploaded_custom_metadata = item.custom_metadata()?;
assert_eq!(uploaded_custom_metadata, custom_metadata);
let uploaded_http_metadata = item.http_metadata();
assert_eq!(uploaded_http_metadata, http_metadata);
Response::ok("ok")
}
#[worker::send]
pub async fn put(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("PUT_BUCKET")?;
// R2 requires that we use a fixed-length-stream for the body.
let stream = futures_util::stream::repeat_with(|| Ok(vec![0u8; 16])).take(16);
let fixed_stream = FixedLengthStream::wrap(stream, 16 * 16);
bucket.put("text", "text".to_string()).execute().await?;
bucket.put("bytes", vec![0u8; 32]).execute().await?;
bucket.put("empty", Data::Empty).execute().await?;
bucket.put("stream", fixed_stream).execute().await?;
// Now let's get the objects again manually and make sure everything is in-tact.
// Internally `.text()` calls `.bytes()` which calls `.stream()`, so most cases are covered
// by just this check.
let text_obj = bucket.get("text").execute().await?.unwrap();
let text = text_obj.body().unwrap();
assert_eq!(text.text().await?, "text");
// Ensure that the empty object exists, but don't have a body.
let empty_obj = bucket.get("empty").execute().await?.unwrap();
// Miniflare behavior mismatch, in Miniflare an empty body will just return an object without
// a body property. But in workerd it will only return an object without a body property in the
// event that a condition failed
if let Some(body) = empty_obj.body() {
assert_eq!(body.bytes().await?.len(), 0);
}
Response::ok("ok")
}
#[worker::send]
pub async fn put_properties(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("PUT_BUCKET")?;
let (http_metadata, custom_metadata, object_with_props) =
put_full_properties("with_props", &bucket).await?;
let uploaded_custom_metadata = object_with_props.custom_metadata()?;
assert_eq!(uploaded_custom_metadata, custom_metadata);
let uploaded_http_metadata = object_with_props.http_metadata();
assert_eq!(uploaded_http_metadata, http_metadata);
Response::ok("ok")
}
#[allow(clippy::large_stack_arrays)]
#[worker::send]
pub async fn put_multipart(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
const R2_MULTIPART_CHUNK_MIN_SIZE: usize = 5 * 1_024 * 1_024; // 5MiB.
// const TEST_CHUNK_COUNT: usize = 3;
let bucket = env.bucket("PUT_BUCKET")?;
let upload = bucket
.create_multipart_upload("multipart_upload")
.execute()
.await?;
// R2 requires chunks – except for the last one – to be at least 5MiB long.
let chunk_sizes = [
R2_MULTIPART_CHUNK_MIN_SIZE + 100,
R2_MULTIPART_CHUNK_MIN_SIZE + 200,
500,
];
let mut uploaded_parts = vec![];
for (chunk_index, chunk_size) in chunk_sizes.iter().copied().enumerate() {
let chunk = vec![u8::try_from(chunk_index).unwrap(); chunk_size];
uploaded_parts.push(
upload
.upload_part(u16::try_from(chunk_index).unwrap(), chunk)
.await?,
);
}
upload.complete(uploaded_parts).await?;
// Now let's get the object again and ensure it consists of all three parts that were uploaded.
let complete_object = bucket.get("multipart_upload").execute().await?.unwrap();
let complete_object_body = complete_object.body().unwrap();
let complete_object_bytes = complete_object_body.bytes().await?;
assert_eq!(
complete_object_bytes.len(),
R2_MULTIPART_CHUNK_MIN_SIZE + 100 + R2_MULTIPART_CHUNK_MIN_SIZE + 200 + 500
);
assert_eq!(
complete_object_bytes[0..R2_MULTIPART_CHUNK_MIN_SIZE + 100],
[0; R2_MULTIPART_CHUNK_MIN_SIZE + 100]
);
assert_eq!(
complete_object_bytes[R2_MULTIPART_CHUNK_MIN_SIZE + 100..]
[..R2_MULTIPART_CHUNK_MIN_SIZE + 200],
[1; R2_MULTIPART_CHUNK_MIN_SIZE + 200]
);
assert_eq!(
complete_object_bytes
[R2_MULTIPART_CHUNK_MIN_SIZE + 100 + R2_MULTIPART_CHUNK_MIN_SIZE + 200..],
[2; 500]
);
Response::ok("ok")
}
#[worker::send]
pub async fn delete(_req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let bucket = env.bucket("DELETE_BUCKET")?;
bucket.put("key", Data::Empty).execute().await?;
let objects = bucket.list().execute().await?;
assert_eq!(objects.objects().len(), 1);
bucket.delete("key").await?;
let keys: Vec<String> = (0..1000).map(|i| format!("key_{i}")).collect();
for key in &keys {
bucket.put(key, Data::Empty).execute().await?;
}
let objects = bucket.list().execute().await?;
assert_eq!(objects.objects().len(), keys.len());
bucket.delete_multiple(keys).await?;
let objects = bucket.list().execute().await?;
assert_eq!(objects.objects().len(), 0);
Response::ok("ok")
}
async fn put_full_properties(
name: &str,
bucket: &Bucket,
) -> Result<(HttpMetadata, HashMap<String, String>, worker::Object)> {
let (http_metadata, custom_metadata) = dummy_properties();
let md5_hash: [u8; 16] = md5::compute("example").into();
let object_with_props = bucket
.put(name, "example".to_string())
.http_metadata(http_metadata.clone())
.custom_metadata(custom_metadata.clone())
.md5(md5_hash)
.execute()
.await?;
Ok((http_metadata, custom_metadata, object_with_props))
}
fn dummy_properties() -> (HttpMetadata, HashMap<String, String>) {
let http_metadata = HttpMetadata {
content_type: Some("text/text".into()),
content_language: Some("en-US".into()),
content_disposition: Some("inline".into()),
content_encoding: Some("gzip".into()),
cache_control: Some("immutable".into()),
cache_expiry: Some(Date::now()),
};
let custom_metadata = {
let mut map = HashMap::new();
map.insert("a".into(), "b".into());
map
};
(http_metadata, custom_metadata)
}