diff --git a/Cargo.lock b/Cargo.lock index 5c5ee4c..bd23658 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2963,7 +2963,7 @@ dependencies = [ [[package]] name = "rezervator" -version = "1.0.0" +version = "1.1.0" dependencies = [ "actix-files", "actix-multipart", diff --git a/Cargo.toml b/Cargo.toml index e257cea..b2538ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rezervator" -version = "1.0.0" +version = "1.1.0" edition = "2021" [lib] diff --git a/src/backend/mail.rs b/src/backend/mail.rs index d72fdd3..1872b02 100644 --- a/src/backend/mail.rs +++ b/src/backend/mail.rs @@ -10,15 +10,18 @@ cfg_if! { if #[cfg(feature = "ssr")] { use crate::backend::get_pool; use crate::error::AppError; use log::info; + use log::error; use crate::config::Mailing; use crate::config::MailTransport; use crate::backend::data::ResSumWithItems; use lettre::message::Message as LettreMessage; - use lettre::{AsyncSmtpTransport, AsyncFileTransport, AsyncTransport, Tokio1Executor}; + use lettre::{FileTransport, SmtpTransport, Transport}; use lettre::transport::smtp::client::{Tls, TlsParameters}; use lettre::transport::smtp::authentication::Credentials; use lettre::transport::smtp::extension::ClientId; use std::ops::Add; + use std::sync::mpsc::Sender; + use std::sync::mpsc; pub async fn message_for_type(msg_type: &MessageType, pool: &PgPool) -> Result { Ok(query_as::<_, Message>("SELECT * FROM message WHERE msg_type = $1") @@ -100,13 +103,26 @@ cfg_if! { if #[cfg(feature = "ssr")] { } } - impl Mailing { - pub async fn send_mail(&self, msg: MailMessage) -> Result<(), AppError> { - match self.transport() { + #[derive(Clone)] + enum MailerType { + Smtp(SmtpTransport), + File(String) + } + + #[derive(Clone)] + pub struct Mailer { + transport: MailerType, + from: String, + sender: Option> + } + + impl Mailer { + pub fn new(config: &Mailing) -> Self { + match config.transport() { MailTransport::Smtp => { - let tls = if let Some(t) = self.accept_all_certs() { + let tls = if let Some(t) = config.accept_all_certs() { if t { - let tls = TlsParameters::builder(self.server().clone().unwrap_or_default()) + let tls = TlsParameters::builder(config.server().clone().unwrap_or_default()) .dangerous_accept_invalid_certs(true) .dangerous_accept_invalid_hostnames(true); Some(tls.build().expect("Cannot build TLS params")) @@ -116,8 +132,8 @@ cfg_if! { if #[cfg(feature = "ssr")] { } else { None }; - let transport = if self.tls().unwrap_or(false) { - let transport = AsyncSmtpTransport::::starttls_relay(&self.server().clone().unwrap_or_default()) + let transport = if config.tls().unwrap_or(false) { + let transport = SmtpTransport::starttls_relay(&config.server().clone().unwrap_or_default()) .expect("Cannot create SMTP mail transport"); if let Some(t) = tls { transport.tls(Tls::Required(t)) @@ -125,7 +141,7 @@ cfg_if! { if #[cfg(feature = "ssr")] { transport } } else { - let transport = AsyncSmtpTransport::::relay(&self.server().clone().unwrap_or_default()) + let transport = SmtpTransport::relay(&config.server().clone().unwrap_or_default()) .expect("Cannot create SMTP mail transport"); if let Some(t) = tls { transport.tls(Tls::Wrapper(t)) @@ -133,30 +149,85 @@ cfg_if! { if #[cfg(feature = "ssr")] { transport } }; - let transport = if let Some(p) = self.port() { + let transport = if let Some(p) = config.port() { transport.port(p) } else { transport }; - let transport = if let Some(hello) = self.hello_name() { + let transport = if let Some(hello) = config.hello_name() { transport.hello_name(ClientId::Domain(hello.to_string())) } else { transport }; - if self.user().is_some() && self.password().is_some() { - let cred = Credentials::new(self.user().clone().unwrap(), self.password().clone().unwrap()); - transport.credentials(cred).build().send(msg.build_mail(self.from().to_string())?).await?; + if config.user().is_some() && config.password().is_some() { + let cred = Credentials::new(config.user().clone().unwrap(), config.password().clone().unwrap()); + Self { + transport: MailerType::Smtp(transport.credentials(cred).build()), + from: config.from().to_string(), + sender: None + } } else { - transport.build().send(msg.build_mail(self.from().to_string())?).await?; + Self { + transport: MailerType::Smtp(transport.build()), + from: config.from().to_string(), + sender: None + } } } MailTransport::File => { - AsyncFileTransport::::new(self.path().clone().unwrap_or_default()) - .send(msg.build_mail(self.from().to_string())?).await?; + Self { + transport: MailerType::File(config.path().clone().unwrap_or("".to_string())), + from: config.from().to_string(), + sender: None + } } } + } + + fn send(&self, msg: &MailMessage) -> Result<(), AppError> { + let to_send = msg.build_mail(self.from.clone())?; + match &self.transport { + MailerType::Smtp(s) => {s.send(&to_send)?;}, + MailerType::File(s) => {FileTransport::new(s).send(&to_send)?;} + } Ok(()) } + + pub fn start_sender(&mut self) { + let (tx, rx) = mpsc::channel::(); + self.sender = Some(tx); + let mailer = self.clone(); + + std::thread::spawn(move || { + loop { + let msg = rx.recv(); + if let Err(e) = msg { + error!("Mailer error: {}", e.to_string()); + break; + } + + let msg = msg.unwrap(); + + if let Err(e) = mailer.send(&msg) { + error!("Mail send error: {}", e); + } else { + info!("Mail message for: {} has been successfully sent", msg.to); + } + } + }); + + info!("Mail sender started"); + } + + pub fn send_mail(&self, msg: MailMessage) -> Result<(), AppError> { + if let Err(e) = self.sender.as_ref().expect("Sender not started").send(msg) { + error!("Mail queue error: {}", e); + Err(AppError::MailSendError(e.to_string())) + } else { + info!("Message queued for send"); + Ok(()) + } + } } }} diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 92549f6..1b0f5ff 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -55,16 +55,16 @@ cfg_if!{ use actix_web::web::Data; use leptos_actix::extract; use leptos::ServerFnError; - use crate::config::Mailing; + use crate::backend::mail::Mailer; #[derive(Clone)] pub struct AppData { db_pool: PgPool, - mailer: Mailing + mailer: Mailer } impl AppData { - pub fn new(db_pool: PgPool, mailer: Mailing) -> Self { + pub fn new(db_pool: PgPool, mailer: Mailer) -> Self { Self { db_pool, mailer @@ -75,7 +75,7 @@ cfg_if!{ &self.db_pool } - pub fn mailer(&self) -> &Mailing { + pub fn mailer(&self) -> &Mailer { &self.mailer } } @@ -85,7 +85,7 @@ cfg_if!{ Ok(data.db_pool().clone()) } - pub async fn get_mailing() -> Result { + pub async fn get_mailing() -> Result { let data = extract::>().await?; Ok(data.mailer().clone()) } diff --git a/src/backend/property.rs b/src/backend/property.rs index f602bb1..0f03778 100644 --- a/src/backend/property.rs +++ b/src/backend/property.rs @@ -7,7 +7,7 @@ use crate::components::data_form::ForValidation; cfg_if! { if #[cfg(feature = "ssr")] { use crate::backend::get_pool; - pub async fn get_props(filter: Option) -> Result, ServerFnError> { + pub async fn get_props(filter: Option<&'static str>) -> Result, ServerFnError> { let pool = get_pool().await?; let props = if let Some(f) = filter { sqlx::query_as::<_, ResProperty>(&format!("SELECT * FROM property WHERE {} ORDER BY id", f)).fetch_all(&pool).await? @@ -38,7 +38,7 @@ pub async fn get_properties() -> Result>, ServerFnE #[server] pub async fn get_active_properties() -> Result>, ServerFnError> { - let props = get_props(Some("active = true".to_string())).await?; + let props = get_props(Some("active = true")).await?; Ok(ApiResponse::Data(props)) } diff --git a/src/backend/reservation.rs b/src/backend/reservation.rs index 11cb0a6..fe47407 100644 --- a/src/backend/reservation.rs +++ b/src/backend/reservation.rs @@ -167,7 +167,7 @@ cfg_if! { if #[cfg(feature = "ssr")] { let msg = get_message(MessageType::NewReservation).await?; for m in emails_for_notify().await? { - mailing.send_mail(MailMessage::new(admin_mail.clone(), m, msg.clone(), reservation)).await?; + mailing.send_mail(MailMessage::new(admin_mail.clone(), m, msg.clone(), reservation))?; } Ok(()) @@ -183,7 +183,7 @@ cfg_if! { if #[cfg(feature = "ssr")] { return Err(AppError::MailSendError("No admin mail".to_string())) } - mailing.send_mail(MailMessage::new(admin_mail.clone().unwrap(), reservation.customer.email.clone(), msg, &reservation)).await?; + mailing.send_mail(MailMessage::new(admin_mail.clone().unwrap(), reservation.customer.email.clone(), msg, &reservation))?; notify_new_all(admin_mail.unwrap(), &reservation).await } @@ -196,7 +196,7 @@ cfg_if! { if #[cfg(feature = "ssr")] { return Err(AppError::MailSendError("No admin mail".to_string())) } - mailing.send_mail(MailMessage::new(admin_mail.unwrap(), reservation.customer.email.clone(), msg, &reservation)).await + mailing.send_mail(MailMessage::new(admin_mail.unwrap(), reservation.customer.email.clone(), msg, &reservation)) } async fn notify_approve(uuid: Uuid) -> Result<(), AppError> { @@ -238,7 +238,7 @@ pub async fn get_public_form_data(day: NaiveDate) -> Result std::io::Result<()> { .connect(&srv_conf.database().con_string()).await.unwrap(); migrate!().run(&pool).await.expect("could not run SQLx migrations"); - let mailing = srv_conf.mailing().clone(); + let mut mailer = Mailer::new(srv_conf.mailing()); + mailer.start_sender(); if let Err(e) = create_admin(&pool).await { error!("Error while checking admin user: {:?}", e); @@ -78,7 +79,7 @@ async fn main() -> std::io::Result<()> { let site_root = &leptos_options.site_root; App::new() - .app_data(Data::new(AppData::new(pool.clone(), mailing.clone()))) + .app_data(Data::new(AppData::new(pool.clone(), mailer.clone()))) .wrap(Logger::default()) .wrap(Authentication) .wrap(SessionMiddleware::new( diff --git a/src/validator.rs b/src/validator.rs index deef3c5..91d1924 100644 --- a/src/validator.rs +++ b/src/validator.rs @@ -26,7 +26,7 @@ impl Validator { } } - pub fn check(&self, entity: &(impl Validate + ?Sized), ev: &web_sys::Event) { + pub fn check(&self, entity: &dyn Validate, ev: &web_sys::Event) { if let Err(val_err) = entity.validate() { ev.prevent_default(); //self.set_message.update(|m| *m = Some(val_err.to_string().clone()));