Small update to Rocket WebSockets

Switched from channels to stream. This is able to use yield, and the
code looks a bit nicer this way.

Also updated all the crates.
This commit is contained in:
BlackDex
2023-04-12 15:59:05 +02:00
parent 07099df41a
commit 48cc31a59f
3 changed files with 37 additions and 39 deletions

View File

@ -74,7 +74,7 @@ async fn websockets_hub<'r>(
ws: rocket_ws::WebSocket,
data: WsAccessToken,
ip: ClientIp,
) -> Result<rocket_ws::Channel<'r>, Error> {
) -> Result<rocket_ws::Stream!['r], Error> {
let addr = ip.ip;
info!("Accepting Rocket WS connection from {addr}");
@ -93,19 +93,19 @@ async fn websockets_hub<'r>(
(rx, WSEntryMapGuard::new(users, claims.sub, entry_uuid, addr))
};
Ok(ws.channel(move |mut stream| {
Box::pin(async move {
// Make sure the guard is moved into the channel future so it's not dropped earlier
Ok({
rocket_ws::Stream! { ws => {
let mut ws = ws;
let _guard = guard;
let mut interval = tokio::time::interval(Duration::from_secs(15));
loop {
tokio::select! {
res = stream.next() => {
res = ws.next() => {
match res {
Some(Ok(message)) => {
match message {
// Respond to any pings
Message::Ping(ping) => stream.send(Message::Pong(ping)).await?,
Message::Ping(ping) => yield Message::Pong(ping),
Message::Pong(_) => {/* Ignored */},
// We should receive an initial message with the protocol and version, and we will reply to it
@ -113,12 +113,12 @@ async fn websockets_hub<'r>(
let msg = message.strip_suffix(RECORD_SEPARATOR as char).unwrap_or(message);
if serde_json::from_str(msg).ok() == Some(INITIAL_MESSAGE) {
stream.send(Message::binary(INITIAL_RESPONSE)).await?;
yield Message::binary(INITIAL_RESPONSE);
continue;
}
}
// Just echo anything else the client sends
_ => stream.send(message).await?,
_ => yield message,
}
}
_ => break,
@ -127,18 +127,16 @@ async fn websockets_hub<'r>(
res = rx.recv() => {
match res {
Some(res) => stream.send(res).await?,
Some(res) => yield res,
None => break,
}
}
_ = interval.tick() => stream.send(Message::Ping(create_ping())).await?
_ = interval.tick() => yield Message::Ping(create_ping())
}
}
Ok(())
})
}))
}}
})
}
//