Update to rocket 0.5 and made code async, missing updating all db calls, that are currently blocking

This commit is contained in:
Daniel García
2021-11-07 18:53:39 +01:00
parent 08f0de7b46
commit 2d5f172e77
30 changed files with 1314 additions and 1028 deletions

View File

@ -20,8 +20,15 @@ extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
use job_scheduler::{Job, JobScheduler};
use std::{fs::create_dir_all, panic, path::Path, process::exit, str::FromStr, thread, time::Duration};
use std::{
fs::{canonicalize, create_dir_all},
panic,
path::Path,
process::exit,
str::FromStr,
thread,
time::Duration,
};
#[macro_use]
mod error;
@ -37,9 +44,11 @@ mod util;
pub use config::CONFIG;
pub use error::{Error, MapResult};
use rocket::data::{Limits, ToByteUnit};
pub use util::is_running_in_docker;
fn main() {
#[rocket::main]
async fn main() -> Result<(), Error> {
parse_args();
launch_info();
@ -56,13 +65,16 @@ fn main() {
});
check_web_vault();
create_icon_cache_folder();
create_dir(&CONFIG.icon_cache_folder(), "icon cache");
create_dir(&CONFIG.tmp_folder(), "tmp folder");
create_dir(&CONFIG.sends_folder(), "sends folder");
create_dir(&CONFIG.attachments_folder(), "attachments folder");
let pool = create_db_pool();
schedule_jobs(pool.clone());
crate::db::models::TwoFactor::migrate_u2f_to_webauthn(&pool.get().unwrap()).unwrap();
schedule_jobs(pool.clone()).await;
crate::db::models::TwoFactor::migrate_u2f_to_webauthn(&pool.get().await.unwrap()).unwrap();
launch_rocket(pool, extra_debug); // Blocks until program termination.
launch_rocket(pool, extra_debug).await // Blocks until program termination.
}
const HELP: &str = "\
@ -127,10 +139,12 @@ fn init_logging(level: log::LevelFilter) -> Result<(), fern::InitError> {
.level_for("hyper::server", log::LevelFilter::Warn)
// Silence rocket logs
.level_for("_", log::LevelFilter::Off)
.level_for("launch", log::LevelFilter::Off)
.level_for("launch_", log::LevelFilter::Off)
.level_for("rocket::rocket", log::LevelFilter::Off)
.level_for("rocket::fairing", log::LevelFilter::Off)
.level_for("rocket::launch", log::LevelFilter::Error)
.level_for("rocket::launch_", log::LevelFilter::Error)
.level_for("rocket::rocket", log::LevelFilter::Warn)
.level_for("rocket::server", log::LevelFilter::Warn)
.level_for("rocket::fairing::fairings", log::LevelFilter::Warn)
.level_for("rocket::shield::shield", log::LevelFilter::Warn)
// Never show html5ever and hyper::proto logs, too noisy
.level_for("html5ever", log::LevelFilter::Off)
.level_for("hyper::proto", log::LevelFilter::Off)
@ -243,10 +257,6 @@ fn create_dir(path: &str, description: &str) {
create_dir_all(path).expect(&err_msg);
}
fn create_icon_cache_folder() {
create_dir(&CONFIG.icon_cache_folder(), "icon cache");
}
fn check_data_folder() {
let data_folder = &CONFIG.data_folder();
let path = Path::new(data_folder);
@ -314,51 +324,73 @@ fn create_db_pool() -> db::DbPool {
}
}
fn launch_rocket(pool: db::DbPool, extra_debug: bool) {
async fn launch_rocket(pool: db::DbPool, extra_debug: bool) -> Result<(), Error> {
let basepath = &CONFIG.domain_path();
let mut config = rocket::Config::from(rocket::Config::figment());
config.address = std::net::IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED); // TODO: Allow this to be changed, keep ROCKET_ADDRESS for compat
config.temp_dir = canonicalize(CONFIG.tmp_folder()).unwrap().into();
config.limits = Limits::new() //
.limit("json", 10.megabytes())
.limit("data-form", 150.megabytes())
.limit("file", 150.megabytes());
// If adding more paths here, consider also adding them to
// crate::utils::LOGGED_ROUTES to make sure they appear in the log
let result = rocket::ignite()
.mount(&[basepath, "/"].concat(), api::web_routes())
.mount(&[basepath, "/api"].concat(), api::core_routes())
.mount(&[basepath, "/admin"].concat(), api::admin_routes())
.mount(&[basepath, "/identity"].concat(), api::identity_routes())
.mount(&[basepath, "/icons"].concat(), api::icons_routes())
.mount(&[basepath, "/notifications"].concat(), api::notifications_routes())
let instance = rocket::custom(config)
.mount([basepath, "/"].concat(), api::web_routes())
.mount([basepath, "/api"].concat(), api::core_routes())
.mount([basepath, "/admin"].concat(), api::admin_routes())
.mount([basepath, "/identity"].concat(), api::identity_routes())
.mount([basepath, "/icons"].concat(), api::icons_routes())
.mount([basepath, "/notifications"].concat(), api::notifications_routes())
.manage(pool)
.manage(api::start_notification_server())
.attach(util::AppHeaders())
.attach(util::Cors())
.attach(util::BetterLogging(extra_debug))
.launch();
.ignite()
.await?;
// Launch and print error if there is one
// The launch will restore the original logging level
error!("Launch error {:#?}", result);
CONFIG.set_rocket_shutdown_handle(instance.shutdown());
ctrlc::set_handler(move || {
info!("Exiting vaultwarden!");
CONFIG.shutdown();
})
.expect("Error setting Ctrl-C handler");
instance.launch().await?;
info!("Vaultwarden process exited!");
Ok(())
}
fn schedule_jobs(pool: db::DbPool) {
async fn schedule_jobs(pool: db::DbPool) {
if CONFIG.job_poll_interval_ms() == 0 {
info!("Job scheduler disabled.");
return;
}
let runtime = tokio::runtime::Handle::current();
thread::Builder::new()
.name("job-scheduler".to_string())
.spawn(move || {
use job_scheduler::{Job, JobScheduler};
let mut sched = JobScheduler::new();
// Purge sends that are past their deletion date.
if !CONFIG.send_purge_schedule().is_empty() {
sched.add(Job::new(CONFIG.send_purge_schedule().parse().unwrap(), || {
api::purge_sends(pool.clone());
runtime.spawn(api::purge_sends(pool.clone()));
}));
}
// Purge trashed items that are old enough to be auto-deleted.
if !CONFIG.trash_purge_schedule().is_empty() {
sched.add(Job::new(CONFIG.trash_purge_schedule().parse().unwrap(), || {
api::purge_trashed_ciphers(pool.clone());
runtime.spawn(api::purge_trashed_ciphers(pool.clone()));
}));
}
@ -366,7 +398,7 @@ fn schedule_jobs(pool: db::DbPool) {
// indicates that a user's master password has been compromised.
if !CONFIG.incomplete_2fa_schedule().is_empty() {
sched.add(Job::new(CONFIG.incomplete_2fa_schedule().parse().unwrap(), || {
api::send_incomplete_2fa_notifications(pool.clone());
runtime.spawn(api::send_incomplete_2fa_notifications(pool.clone()));
}));
}
@ -375,7 +407,7 @@ fn schedule_jobs(pool: db::DbPool) {
// sending reminders for requests that are about to be granted anyway.
if !CONFIG.emergency_request_timeout_schedule().is_empty() {
sched.add(Job::new(CONFIG.emergency_request_timeout_schedule().parse().unwrap(), || {
api::emergency_request_timeout_job(pool.clone());
runtime.spawn(api::emergency_request_timeout_job(pool.clone()));
}));
}
@ -383,7 +415,7 @@ fn schedule_jobs(pool: db::DbPool) {
// emergency access requests.
if !CONFIG.emergency_notification_reminder_schedule().is_empty() {
sched.add(Job::new(CONFIG.emergency_notification_reminder_schedule().parse().unwrap(), || {
api::emergency_notification_reminder_job(pool.clone());
runtime.spawn(api::emergency_notification_reminder_job(pool.clone()));
}));
}