branch: main
lib.rs
4765 bytesRaw
use serde::{Deserialize, Serialize};
use wasm_bindgen::JsValue;
use worker::*;
const MY_MESSAGES_BINDING_NAME: &str = "my_messages";
const MY_MESSAGES_QUEUE_NAME: &str = "mymessages";
const RAW_MESSAGES_BINDING_NAME: &str = "raw_messages";
const RAW_MESSAGES_QUEUE_NAME: &str = "rawmessages";
#[derive(Serialize, Debug, Clone, Deserialize)]
pub struct MyType {
foo: String,
bar: u32,
}
#[event(fetch)]
async fn main(_req: Request, env: Env, _: worker::Context) -> Result<Response> {
let my_messages_queue = env.queue(MY_MESSAGES_BINDING_NAME)?;
let raw_messages_queue = env.queue(RAW_MESSAGES_BINDING_NAME)?;
// Send a message with using a serializable struct
my_messages_queue
.send(MyType {
foo: "Hello world".into(),
bar: 1,
})
.await?;
// Send a batch of messages using some sort of iterator
my_messages_queue
.send_batch([
// Use the MessageBuilder to set additional options
MessageBuilder::new(MyType {
foo: "Hello world".into(),
bar: 2,
})
.delay_seconds(20)
.build(),
// Send a message with using a serializable struct
MyType {
foo: "Hello world".into(),
bar: 4,
}
.into(),
])
.await?;
// Send a batch of messages using the BatchMessageBuilder
my_messages_queue
.send_batch(
BatchMessageBuilder::new()
.message(MyType {
foo: "Hello world".into(),
bar: 4,
})
.messages(vec![
MyType {
foo: "Hello world".into(),
bar: 5,
},
MyType {
foo: "Hello world".into(),
bar: 6,
},
])
.delay_seconds(10)
.build(),
)
.await?;
// Send a raw JSValue
raw_messages_queue
.send_raw(
// RawMessageBuilder has to be used as we should set content type of these raw messages
RawMessageBuilder::new(JsValue::from_str("7"))
.delay_seconds(30)
.build_with_content_type(QueueContentType::Json),
)
.await?;
// Send a batch of raw JSValues using the BatchMessageBuilder
raw_messages_queue
.send_raw_batch(
BatchMessageBuilder::new()
.message(
RawMessageBuilder::new(js_sys::Date::new_0().into())
.build_with_content_type(QueueContentType::V8),
)
.message(
RawMessageBuilder::new(JsValue::from_str("8"))
.build_with_content_type(QueueContentType::Json),
)
.delay_seconds(10)
.build(),
)
.await?;
// Send a batch of raw JsValues using some sort of iterator
raw_messages_queue
.send_raw_batch(vec![RawMessageBuilder::new(JsValue::from_str("9"))
.delay_seconds(20)
.build_with_content_type(QueueContentType::Text)])
.await?;
Response::empty()
}
// Consumes messages from `my_messages` queue and `raw_messages` queue
#[event(queue)]
pub async fn main(message_batch: MessageBatch<MyType>, _: Env, _: Context) -> Result<()> {
match message_batch.queue().as_str() {
MY_MESSAGES_QUEUE_NAME => {
for message in message_batch.messages()? {
console_log!(
"Got message {:?}, with id {} and timestamp: {}",
message.body(),
message.id(),
message.timestamp().to_string(),
);
if message.body().bar == 1 {
message.retry_with_options(
&QueueRetryOptionsBuilder::new()
.with_delay_seconds(10)
.build(),
);
} else {
message.ack();
}
}
}
RAW_MESSAGES_QUEUE_NAME => {
for message in message_batch.raw_iter() {
console_log!(
"Got raw message {:?}, with id {} and timestamp: {}",
message.body(),
message.id(),
message.timestamp().to_string(),
);
}
message_batch.ack_all();
}
_ => {
console_error!("Unknown queue: {}", message_batch.queue());
}
}
Ok(())
}