branch: main
queue.rs
2670 bytesRaw
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use super::{SomeSharedData, GLOBAL_QUEUE_STATE};
use worker::{
console_log, event, Context, Env, MessageBatch, MessageExt, Request, Response, Result,
};
#[derive(Serialize, Debug, Clone, Deserialize)]
pub struct QueueBody {
pub id: Uuid,
pub id_string: String,
}
#[event(queue)]
pub async fn queue(message_batch: MessageBatch<QueueBody>, _env: Env, _ctx: Context) -> Result<()> {
let mut guard = GLOBAL_QUEUE_STATE.lock().unwrap();
for message in message_batch.messages()? {
console_log!(
"Received queue message {:?}, with id {} and timestamp: {}",
message.body(),
message.id(),
message.timestamp().to_string()
);
guard.push(message.into_body());
}
Ok(())
}
#[worker::send]
pub async fn handle_queue_send(req: Request, env: Env, _data: SomeSharedData) -> Result<Response> {
let uri = req.url()?;
let mut segments = uri.path_segments().unwrap();
let Some(id) = segments
.nth(2)
.map(|id| Uuid::try_parse(id).ok())
.and_then(|u| u)
else {
return Response::error("Failed to parse id, expected a UUID", 400);
};
let my_queue = match env.queue("my_queue") {
Ok(queue) => queue,
Err(err) => return Response::error(format!("Failed to get queue: {err:?}"), 500),
};
match my_queue
.send(&QueueBody {
id,
id_string: id.to_string(),
})
.await
{
Ok(()) => Response::ok("Message sent"),
Err(err) => Response::error(format!("Failed to send message to queue: {err:?}"), 500),
}
}
#[worker::send]
pub async fn handle_batch_send(mut req: Request, env: Env, _: SomeSharedData) -> Result<Response> {
let messages: Vec<QueueBody> = match req.json().await {
Ok(messages) => messages,
Err(err) => {
return Response::error(format!("Failed to parse request body: {err:?}"), 400);
}
};
let my_queue = match env.queue("my_queue") {
Ok(queue) => queue,
Err(err) => return Response::error(format!("Failed to get queue: {err:?}"), 500),
};
match my_queue.send_batch(messages).await {
Ok(()) => Response::ok("Message sent"),
Err(err) => Response::error(
format!("Failed to batch send message to queue: {err:?}"),
500,
),
}
}
pub async fn handle_queue(_req: Request, _env: Env, _data: SomeSharedData) -> Result<Response> {
let guard = GLOBAL_QUEUE_STATE.lock().unwrap();
let messages: Vec<QueueBody> = guard.clone();
Response::from_json(&messages)
}