Message ingestion (not tested)

This commit is contained in:
Mauro D
2023-05-17 17:35:36 +00:00
parent e0e8347de1
commit 4e4632571c
53 changed files with 1363 additions and 230 deletions

1
Cargo.lock generated
View File

@@ -4376,6 +4376,7 @@ name = "utils"
version = "0.1.0"
dependencies = [
"mail-auth",
"mail-send",
"opentelemetry",
"opentelemetry-otlp",
"opentelemetry-semantic-conventions",

View File

@@ -19,7 +19,7 @@ path = "crates/main/src/main.rs"
store = { path = "crates/store" }
jmap = { path = "crates/jmap" }
jmap_proto = { path = "crates/jmap-proto" }
smtp = { path = "crates/smtp" }
smtp = { path = "crates/smtp", features = ["local_delivery"] }
utils = { path = "crates/utils" }
tokio = { version = "1.23", features = ["full"] }
tracing = "0.1"

View File

@@ -3,17 +3,22 @@ use crate::JMAP;
use super::{AclToken, AuthDatabase, SqlDatabase};
impl JMAP {
pub async fn authenticate(&self, account: &str, secret: &str) -> Option<AclToken> {
pub async fn authenticate(&self, account: &str, secret: &str) -> Option<u32> {
let account_id = self.get_account_id(account).await?;
let account_secret = self.get_account_secret(account_id).await?;
if secret == account_secret {
self.get_acl_token(account_id).await
account_id.into()
} else {
tracing::debug!(context = "auth", event = "failed", account = account);
None
}
}
pub async fn authenticate_with_token(&self, account: &str, secret: &str) -> Option<AclToken> {
self.get_acl_token(self.authenticate(account, secret).await?)
.await
}
pub async fn get_acl_token(&self, account_id: u32) -> Option<AclToken> {
self.update_acl_token(AclToken {
primary_id: account_id,
@@ -30,7 +35,7 @@ impl JMAP {
query_secret_by_uid,
..
} => {
db.fetch_string(query_secret_by_uid, account_id as i64)
db.fetch_uid_to_string(query_secret_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => None,
@@ -44,7 +49,7 @@ impl JMAP {
query_uid_by_login,
..
} => db
.fetch_id(query_uid_by_login, account)
.fetch_string_to_id(query_uid_by_login, account)
.await
.map(|id| id as u32),
AuthDatabase::Ldap => None,
@@ -58,7 +63,7 @@ impl JMAP {
query_gids_by_uid,
..
} => db
.fetch_ids(query_gids_by_uid, account_id as i64)
.fetch_uid_to_uids(query_gids_by_uid, account_id as i64)
.await
.into_iter()
.map(|id| id as u32)
@@ -73,14 +78,66 @@ impl JMAP {
db,
query_login_by_uid,
..
} => db.fetch_string(query_login_by_uid, account_id as i64).await,
} => {
db.fetch_uid_to_string(query_login_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => None,
}
}
pub async fn get_uids_by_address(&self, address: &str) -> Vec<u32> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_gids_by_uid,
..
} => db
.fetch_string_to_uids(query_gids_by_uid, address)
.await
.into_iter()
.map(|id| id as u32)
.collect(),
AuthDatabase::Ldap => vec![],
}
}
pub async fn get_addresses_by_uid(&self, account_id: u32) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql {
db,
query_addresses_by_uid,
..
} => {
db.fetch_uid_to_strings(query_addresses_by_uid, account_id as i64)
.await
}
AuthDatabase::Ldap => vec![],
}
}
pub async fn vrfy_address(&self, address: &str) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql { db, query_vrfy, .. } => {
db.fetch_string_to_strings(query_vrfy, address).await
}
AuthDatabase::Ldap => vec![],
}
}
pub async fn expn_address(&self, address: &str) -> Vec<String> {
match &self.auth_db {
AuthDatabase::Sql { db, query_expn, .. } => {
db.fetch_string_to_strings(query_expn, address).await
}
AuthDatabase::Ldap => vec![],
}
}
}
// TODO abstract this
impl SqlDatabase {
pub async fn fetch_string(&self, query: &str, uid: i64) -> Option<String> {
pub async fn fetch_uid_to_string(&self, query: &str, uid: i64) -> Option<String> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, String>(query)
@@ -117,7 +174,7 @@ impl SqlDatabase {
}
}
pub async fn fetch_id(&self, query: &str, param: &str) -> Option<i64> {
pub async fn fetch_string_to_id(&self, query: &str, param: &str) -> Option<i64> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, i64>(query)
@@ -154,7 +211,7 @@ impl SqlDatabase {
}
}
pub async fn fetch_strings(&self, query: &str, uid: i64) -> Vec<String> {
pub async fn fetch_uid_to_strings(&self, query: &str, uid: i64) -> Vec<String> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, String>(query)
@@ -191,7 +248,7 @@ impl SqlDatabase {
}
}
pub async fn fetch_ids(&self, query: &str, uid: i64) -> Vec<i64> {
pub async fn fetch_uid_to_uids(&self, query: &str, uid: i64) -> Vec<i64> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, i64>(query)
@@ -228,6 +285,80 @@ impl SqlDatabase {
}
}
pub async fn fetch_string_to_uids(&self, query: &str, param: &str) -> Vec<i64> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, i64>(query)
.bind(param)
.fetch_all(pool)
.await
}
SqlDatabase::MySql(pool) => {
sqlx::query_scalar::<_, i64>(query)
.bind(param)
.fetch_all(pool)
.await
}
/*SqlDatabase::MsSql(pool) => {
sqlx::query_scalar::<_, i64>(query)
.bind(param)
.fetch_all(pool)
.await
}*/
SqlDatabase::SqlLite(pool) => {
sqlx::query_scalar::<_, i64>(query)
.bind(param)
.fetch_all(pool)
.await
}
};
match result {
Ok(result) => result,
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = query, reason = ?err);
vec![]
}
}
}
pub async fn fetch_string_to_strings(&self, query: &str, param: &str) -> Vec<String> {
let result = match &self {
SqlDatabase::Postgres(pool) => {
sqlx::query_scalar::<_, String>(query)
.bind(param)
.fetch_all(pool)
.await
}
SqlDatabase::MySql(pool) => {
sqlx::query_scalar::<_, String>(query)
.bind(param)
.fetch_all(pool)
.await
}
/*SqlDatabase::MsSql(pool) => {
sqlx::query_scalar::<_, String>(query)
.bind(param)
.fetch_all(pool)
.await
}*/
SqlDatabase::SqlLite(pool) => {
sqlx::query_scalar::<_, String>(query)
.bind(param)
.fetch_all(pool)
.await
}
};
match result {
Ok(result) => result,
Err(err) => {
tracing::warn!(context = "sql", event = "error", query = query, reason = ?err);
vec![]
}
}
}
pub async fn execute(&self, query: &str, params: impl Iterator<Item = String>) -> bool {
let result = match self {
SqlDatabase::Postgres(pool) => {

View File

@@ -56,7 +56,7 @@ impl JMAP {
})
})
{
self.authenticate(&account, &secret).await
self.authenticate_with_token(&account, &secret).await
} else {
tracing::debug!(
context = "authenticate_headers",

View File

@@ -25,6 +25,10 @@ pub enum AuthDatabase {
query_login_by_uid: String,
query_secret_by_uid: String,
query_gids_by_uid: String,
query_uids_by_address: String,
query_addresses_by_uid: String,
query_vrfy: String,
query_expn: String,
},
Ldap,
}

View File

@@ -148,7 +148,7 @@ impl JMAP {
{
if let (Some(email), Some(password)) = (fields.get("email"), fields.get("password"))
{
if let Some(id) = self.authenticate(email, password).await {
if let Some(id) = self.authenticate_with_token(email, password).await {
oauth
.account_id
.store(id.primary_id(), atomic::Ordering::Relaxed);

View File

@@ -109,7 +109,7 @@ impl JMAP {
// Authenticate user
if let (Some(email), Some(password)) = (params.get("email"), params.get("password")) {
if let Some(acl_token) = self.authenticate(email, password).await {
if let Some(acl_token) = self.authenticate_with_token(email, password).await {
// Generate client code
let client_code = thread_rng()
.sample_iter(Alphanumeric)

View File

@@ -113,6 +113,7 @@ impl JMAP {
mailbox_ids,
email.keywords,
email.received_at.map(|r| r.into()),
false,
)
.await
{

View File

@@ -32,6 +32,7 @@ pub struct IngestedEmail {
}
impl JMAP {
#[allow(clippy::blocks_in_if_conditions)]
pub async fn email_ingest(
&self,
raw_message: &[u8],
@@ -39,6 +40,7 @@ impl JMAP {
mailbox_ids: Vec<u32>,
keywords: Vec<Keyword>,
received_at: Option<u64>,
skip_duplicates: bool,
) -> Result<IngestedEmail, MaybeError> {
// Parse message
let message = Message::parse(raw_message)
@@ -83,6 +85,39 @@ impl JMAP {
_ => (),
}
}
// Check for duplicates
if !skip_duplicates
&& !self
.store
.filter(
account_id,
Collection::Email,
references
.iter()
.map(|id| Filter::eq(Property::MessageId, *id))
.collect(),
)
.await
.map_err(|err| {
tracing::error!(
event = "error",
context = "find_duplicates",
error = ?err,
"Duplicate message search failed.");
MaybeError::Temporary
})?
.results
.is_empty()
{
return Ok(IngestedEmail {
id: Id::default(),
change_id: u64::MAX,
blob_id: BlobId::default(),
size: 0,
});
}
let thread_id = if !references.is_empty() {
self.find_or_merge_thread(account_id, subject, &references)
.await?

View File

@@ -705,10 +705,17 @@ impl JMAP {
// Ingest message
response.created.insert(
id,
self.email_ingest(&raw_message, account_id, mailboxes, keywords, received_at)
.await
.map_err(|_| MethodError::ServerPartialFail)?
.into(),
self.email_ingest(
&raw_message,
account_id,
mailboxes,
keywords,
received_at,
false,
)
.await
.map_err(|_| MethodError::ServerPartialFail)?
.into(),
);
}

View File

@@ -15,7 +15,10 @@ use jmap_proto::{
types::{collection::Collection, property::Property},
};
use mail_send::mail_auth::common::lru::{DnsCache, LruCache};
use services::state::{self, init_state_manager, spawn_state_manager};
use services::{
delivery::spawn_delivery_manager,
state::{self, init_state_manager, spawn_state_manager},
};
use sqlx::{mysql::MySqlPoolOptions, postgres::PgPoolOptions, sqlite::SqlitePoolOptions};
use store::{
fts::Language,
@@ -26,7 +29,7 @@ use store::{
BitmapKey, Deserialize, Serialize, Store, ValueKey,
};
use tokio::sync::mpsc;
use utils::{config::Rate, failed, UnwrapFailure};
use utils::{config::Rate, failed, ipc::DeliveryEvent, UnwrapFailure};
pub mod api;
pub mod auth;
@@ -102,7 +105,10 @@ pub enum MaybeError {
}
impl JMAP {
pub async fn init(config: &utils::config::Config) -> Arc<Self> {
pub async fn init(
config: &utils::config::Config,
delivery_rx: mpsc::Receiver<DeliveryEvent>,
) -> Arc<Self> {
let auth_db = match config
.value_require("jmap.auth.database.type")
.failed("Invalid property")
@@ -182,6 +188,22 @@ impl JMAP {
.value_require("jmap.auth.database.query.gids-by-uid")
.failed("Invalid property")
.to_string(),
query_uids_by_address: config
.value_require("jmap.auth.database.query.uids-by-address")
.failed("Invalid property")
.to_string(),
query_addresses_by_uid: config
.value_require("jmap.auth.database.query.addresses-by-uid")
.failed("Invalid property")
.to_string(),
query_vrfy: config
.value_require("jmap.auth.database.query.vrfy")
.failed("Invalid property")
.to_string(),
query_expn: config
.value_require("jmap.auth.database.query.expn")
.failed("Invalid property")
.to_string(),
}
}
_ => failed("Invalid auth database type"),
@@ -227,6 +249,9 @@ impl JMAP {
state_tx,
});
// Spawn delivery manager
spawn_delivery_manager(jmap_server.clone(), delivery_rx);
// Spawn state manager
spawn_state_manager(jmap_server.clone(), config, state_rx);

View File

@@ -0,0 +1,39 @@
use std::sync::Arc;
use mail_send::Credentials;
use tokio::sync::mpsc;
use utils::ipc::{DeliveryEvent, Item};
use crate::JMAP;
pub fn spawn_delivery_manager(core: Arc<JMAP>, mut delivery_rx: mpsc::Receiver<DeliveryEvent>) {
tokio::spawn(async move {
while let Some(event) = delivery_rx.recv().await {
match event {
DeliveryEvent::Ingest { message, result_tx } => {
result_tx.send(core.deliver_message(message).await).ok();
}
DeliveryEvent::Lookup(lookup) => {
lookup
.result
.send(match lookup.item {
Item::IsAccount(address) => {
(!core.get_uids_by_address(&address).await.is_empty()).into()
}
Item::Authenticate(credentials) => match credentials {
Credentials::Plain { username, secret } => {
core.authenticate(&username, &secret).await.is_some()
}
_ => false,
}
.into(),
Item::Verify(address) => core.vrfy_address(&address).await.into(),
Item::Expand(address) => core.expn_address(&address).await.into(),
})
.ok();
}
DeliveryEvent::Stop => break,
}
}
});
}

View File

@@ -0,0 +1,111 @@
use jmap_proto::types::{state::StateChange, type_state::TypeState};
use store::ahash::AHashMap;
use utils::ipc::{DeliveryResult, IngestMessage};
use crate::{mailbox::INBOX_ID, MaybeError, JMAP};
impl JMAP {
pub async fn deliver_message(&self, message: IngestMessage) -> Vec<DeliveryResult> {
// Read message
let raw_message = match message.read_message().await {
Ok(raw_message) => raw_message,
Err(_) => {
return (0..message.recipients.len())
.map(|_| DeliveryResult::TemporaryFailure {
reason: "Temporary I/O error.".into(),
})
.collect::<Vec<_>>();
}
};
// Obtain the UIDs for each recipient
let mut recipients = Vec::with_capacity(message.recipients.len());
let mut deliver_uids = AHashMap::with_capacity(message.recipients.len());
for rcpt in message.recipients {
let uids = self.get_uids_by_address(&rcpt).await;
for uid in &uids {
deliver_uids.insert(*uid, DeliveryResult::Success);
}
recipients.push(uids);
}
// Deliver to each recipient
for (uid, status) in &mut deliver_uids {
match self
.email_ingest(&raw_message, *uid, vec![INBOX_ID], vec![], None, true)
.await
{
Ok(ingested_message) => {
// Notify state change
if ingested_message.change_id != u64::MAX {
self.broadcast_state_change(
StateChange::new(*uid)
.with_change(TypeState::EmailDelivery, ingested_message.change_id)
.with_change(TypeState::Email, ingested_message.change_id)
.with_change(TypeState::Mailbox, ingested_message.change_id)
.with_change(TypeState::Thread, ingested_message.change_id),
)
.await;
}
}
Err(err) => match err {
MaybeError::Temporary => {
*status = DeliveryResult::TemporaryFailure {
reason: "Transient server failure.".into(),
}
}
MaybeError::Permanent(reason) => {
*status = DeliveryResult::PermanentFailure {
code: [5, 5, 0],
reason: reason.into(),
}
}
},
}
}
// Build result
recipients
.into_iter()
.map(|uids| {
match uids.len() {
1 => {
// Delivery to single recipient
deliver_uids.get(&uids[0]).unwrap().clone()
}
0 => {
// Something went wrong
DeliveryResult::TemporaryFailure {
reason: "Address lookup failed.".into(),
}
}
_ => {
// Delivery to list, count number of successes and failures
let mut success = 0;
let mut temp_failures = 0;
for uid in uids {
match deliver_uids.get(&uid).unwrap() {
DeliveryResult::Success => success += 1,
DeliveryResult::TemporaryFailure { .. } => temp_failures += 1,
DeliveryResult::PermanentFailure { .. } => {}
}
}
if success > temp_failures {
DeliveryResult::Success
} else if temp_failures > 0 {
DeliveryResult::TemporaryFailure {
reason: "Delivery to one or more recipients failed temporarily."
.into(),
}
} else {
DeliveryResult::PermanentFailure {
code: [5, 5, 0],
reason: "Delivery to all recipients failed.".into(),
}
}
}
}
})
.collect()
}
}

View File

@@ -1,3 +1,5 @@
pub mod delivery;
pub mod ingest;
pub mod state;
pub const IPC_CHANNEL_BUFFER: usize = 1024;

View File

@@ -1,7 +1,11 @@
use std::time::Duration;
use jmap::{api::JmapSessionManager, JMAP};
use smtp::core::{SmtpAdminSessionManager, SmtpSessionManager, SMTP};
use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
use smtp::{
core::{SmtpAdminSessionManager, SmtpSessionManager, SMTP},
outbound::delivery,
};
use tokio::sync::mpsc;
use utils::{
config::{Config, ServerProtocol},
enable_tracing, wait_for_shutdown, UnwrapFailure,
@@ -23,8 +27,9 @@ async fn main() -> std::io::Result<()> {
);
// Init servers
let smtp = SMTP::init(&config, &servers).await;
let jmap = JMAP::init(&config).await;
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&config, &servers, delivery_tx).await;
let jmap = JMAP::init(&config, delivery_rx).await;
// Spawn servers
let shutdown_tx = servers.spawn(|server, shutdown_rx| {

View File

@@ -50,6 +50,7 @@ privdrop = "0.5.3"
[features]
test_mode = []
local_delivery = []
#[[bench]]
#name = "hash"

View File

@@ -70,7 +70,7 @@ impl ConfigList for Config {
entries.insert(value.to_string());
}
}
Ok(Lookup::Local(entries))
Ok(Lookup::List(entries))
}
}
@@ -118,27 +118,27 @@ mod tests {
let mut expected_lists = AHashMap::from_iter([
(
"list/local-domains".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"example.org".to_string(),
"example.net".to_string(),
]))),
),
(
"list/spammer-domains".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"thatdomain.net".to_string()
]))),
),
(
"list/local-users".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"user1@domain.org".to_string(),
"user2@domain.org".to_string(),
]))),
),
(
"list/power-users".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"user1@domain.org".to_string(),
"user2@domain.org".to_string(),
"user3@example.net".to_string(),

View File

@@ -132,7 +132,7 @@ impl Eq for ConditionMatch {}
impl PartialEq for Lookup {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::Local(l0), Self::Local(r0)) => l0 == r0,
(Self::List(l0), Self::List(r0)) => l0 == r0,
(Self::Remote(_), Self::Remote(_)) => true,
_ => false,
}

View File

@@ -39,10 +39,12 @@ use tokio::{
sync::oneshot,
};
use utils::listener::{limiter::InFlight, SessionManager};
use utils::{
ipc::{Item, LookupResult},
listener::{limiter::InFlight, SessionManager},
};
use crate::{
lookup::{Item, LookupResult},
queue::{self, instant_to_timestamp, InstantFromTimestamp, QueueId, Status},
reporting::{
self,

View File

@@ -42,7 +42,10 @@ use tokio::{
};
use tokio_rustls::TlsConnector;
use tracing::Span;
use utils::listener::{limiter::InFlight, ServerInstance};
use utils::{
ipc::DeliveryEvent,
listener::{limiter::InFlight, ServerInstance},
};
use crate::{
config::{
@@ -98,6 +101,8 @@ pub struct SMTP {
pub mail_auth: MailAuthConfig,
pub report: ReportCore,
pub sieve: SieveCore,
#[cfg(feature = "local_delivery")]
pub delivery_tx: mpsc::Sender<DeliveryEvent>,
}
pub struct SieveCore {

View File

@@ -274,7 +274,7 @@ impl SMTP {
Recipient::List(list) => {
if let Some(list) = self.sieve.lookup.get(&list) {
match list.as_ref() {
Lookup::Local(items) => {
Lookup::List(items) => {
for rcpt in items {
handle.block_on(
message.add_recipient(rcpt, &self.queue.config),

View File

@@ -25,8 +25,9 @@ use mail_parser::decoders::base64::base64_decode;
use mail_send::Credentials;
use smtp_proto::{IntoString, AUTH_LOGIN, AUTH_OAUTHBEARER, AUTH_PLAIN, AUTH_XOAUTH2};
use tokio::io::{AsyncRead, AsyncWrite};
use utils::ipc::Item;
use crate::{core::Session, lookup::Item};
use crate::core::Session;
pub struct SaslToken {
mechanism: u64,

View File

@@ -75,6 +75,8 @@ impl SessionManager for SmtpSessionManager {
tokio::spawn(async move {
let _ = core.queue.tx.send(queue::Event::Stop).await;
let _ = core.report.tx.send(reporting::Event::Stop).await;
#[cfg(feature = "local_delivery")]
let _ = core.delivery_tx.send(utils::ipc::DeliveryEvent::Stop).await;
});
}
}

View File

@@ -22,11 +22,9 @@
*/
use tokio::io::{AsyncRead, AsyncWrite};
use utils::ipc::{Item, LookupResult};
use crate::{
core::Session,
lookup::{Item, LookupResult},
};
use crate::core::Session;
use std::fmt::Write;
impl<T: AsyncWrite + AsyncRead + Unpin> Session<T> {

View File

@@ -32,6 +32,7 @@ use config::{
session::ConfigSession, ConfigContext,
};
use dashmap::DashMap;
use lookup::Lookup;
use mail_send::smtp::tls::build_tls_connector;
use queue::manager::SpawnQueue;
use reporting::scheduler::SpawnReport;
@@ -52,9 +53,20 @@ pub mod reporting;
pub static USER_AGENT: &str = concat!("StalwartSMTP/", env!("CARGO_PKG_VERSION"),);
impl SMTP {
pub async fn init(config: &Config, servers: &Servers) -> Arc<Self> {
pub async fn init(
config: &Config,
servers: &Servers,
#[cfg(feature = "local_delivery")] delivery_tx: mpsc::Sender<utils::ipc::DeliveryEvent>,
) -> Arc<Self> {
// Read configuration parameters
let mut config_ctx = ConfigContext::new(&servers.inner);
#[cfg(feature = "local_delivery")]
config_ctx.lookup.insert(
"local".to_string(),
Arc::new(Lookup::Local(delivery_tx.clone())),
);
config
.parse_remote_hosts(&mut config_ctx)
.failed("Configuration error");
@@ -152,6 +164,8 @@ impl SMTP {
},
mail_auth: mail_auth_config,
sieve: sieve_config,
#[cfg(feature = "local_delivery")]
delivery_tx,
});
// Spawn queue manager

View File

@@ -22,23 +22,32 @@
*/
use mail_send::Credentials;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::{DeliveryEvent, Item, LookupItem, LookupResult};
use super::{Item, Lookup, LookupResult};
use super::Lookup;
impl Lookup {
pub async fn contains(&self, entry: &str) -> Option<bool> {
match self {
#[cfg(feature = "local_delivery")]
Lookup::Local(tx) => lookup_local(tx, Item::IsAccount(entry.to_string()))
.await
.map(|r| r.into()),
Lookup::Remote(tx) => tx
.lookup(Item::IsAccount(entry.to_string()))
.await
.map(|r| r.into()),
Lookup::Sql(sql) => sql.exists(entry).await,
Lookup::Local(entries) => Some(entries.contains(entry)),
Lookup::List(entries) => Some(entries.contains(entry)),
}
}
pub async fn lookup(&self, item: Item) -> Option<LookupResult> {
match self {
#[cfg(feature = "local_delivery")]
Lookup::Local(tx) => lookup_local(tx, item).await,
Lookup::Remote(tx) => tx.lookup(item).await,
Lookup::Sql(sql) => match item {
@@ -57,7 +66,7 @@ impl Lookup {
Item::Expand(list) => sql.fetch_many(&list).await.map(LookupResult::from),
},
Lookup::Local(list) => match item {
Lookup::List(list) => match item {
Item::IsAccount(item) => Some(list.contains(&item).into()),
Item::Verify(_item) | Item::Expand(_item) => {
#[cfg(feature = "test_mode")]
@@ -91,3 +100,19 @@ impl Lookup {
}
}
}
async fn lookup_local(
delivery_tx: &mpsc::Sender<DeliveryEvent>,
item: Item,
) -> Option<LookupResult> {
let (tx, rx) = oneshot::channel();
if delivery_tx
.send(DeliveryEvent::Lookup(LookupItem { item, result: tx }))
.await
.is_ok()
{
rx.await.ok()
} else {
None
}
}

View File

@@ -36,10 +36,11 @@ use tokio::{
sync::mpsc,
};
use tokio_rustls::{client::TlsStream, TlsConnector};
use utils::ipc::{Item, LookupItem};
use crate::lookup::spawn::LoggedUnwrap;
use super::{Event, Item, LookupItem, RemoteLookup};
use super::{Event, RemoteLookup};
pub struct ImapAuthClient<T: AsyncRead + AsyncWrite> {
stream: T,

View File

@@ -22,9 +22,9 @@
*/
use ahash::AHashSet;
use mail_send::Credentials;
use parking_lot::Mutex;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::DeliveryEvent;
use self::cache::LookupCache;
@@ -37,9 +37,11 @@ pub mod sql;
#[derive(Debug)]
pub enum Lookup {
Local(AHashSet<String>),
List(AHashSet<String>),
Remote(LookupChannel),
Sql(SqlQuery),
#[cfg(feature = "local_delivery")]
Local(mpsc::Sender<DeliveryEvent>),
}
#[derive(Debug, Clone)]
@@ -59,55 +61,34 @@ pub struct SqlQuery {
impl Default for Lookup {
fn default() -> Self {
Lookup::Local(AHashSet::default())
Lookup::List(AHashSet::default())
}
}
#[derive(Debug)]
pub enum Event {
Lookup(LookupItem),
Lookup(utils::ipc::LookupItem),
WorkerReady {
item: Item,
item: utils::ipc::Item,
result: Option<bool>,
next_lookup: Option<oneshot::Sender<Option<LookupItem>>>,
next_lookup: Option<oneshot::Sender<Option<utils::ipc::LookupItem>>>,
},
WorkerFailed,
Reload,
Stop,
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Item {
IsAccount(String),
Authenticate(Credentials<String>),
Verify(String),
Expand(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LookupResult {
True,
False,
Values(Vec<String>),
}
#[derive(Debug)]
pub struct LookupItem {
pub item: Item,
pub result: oneshot::Sender<LookupResult>,
}
#[derive(Debug, Clone)]
pub struct LookupChannel {
pub tx: mpsc::Sender<Event>,
}
#[derive(Clone)]
struct RemoteHost<T: RemoteLookup> {
struct NextHop<T: RemoteLookup> {
tx: mpsc::Sender<Event>,
host: T,
}
pub trait RemoteLookup: Clone {
fn spawn_lookup(&self, lookup: LookupItem, tx: mpsc::Sender<Event>);
fn spawn_lookup(&self, lookup: utils::ipc::LookupItem, tx: mpsc::Sender<Event>);
}

View File

@@ -26,8 +26,9 @@ use std::sync::Arc;
use mail_send::smtp::AssertReply;
use smtp_proto::Severity;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::{Item, LookupItem, LookupResult};
use super::{spawn::LoggedUnwrap, Event, Item, LookupItem, LookupResult, RemoteLookup};
use super::{spawn::LoggedUnwrap, Event, RemoteLookup};
pub struct SmtpClientBuilder {
pub builder: mail_send::SmtpClientBuilder<String>,

View File

@@ -21,16 +21,19 @@
* for more details.
*/
use std::{collections::VecDeque, fmt::Debug, sync::Arc, time::Duration};
use std::{collections::VecDeque, sync::Arc, time::Duration};
use crate::config::Host;
use mail_send::smtp::tls::build_tls_connector;
use tokio::sync::{mpsc, oneshot};
use utils::config::{Config, ServerProtocol};
use utils::{
config::{Config, ServerProtocol},
ipc::{Item, LookupItem, LookupResult},
};
use super::{
cache::LookupCache, imap::ImapAuthClientBuilder, smtp::SmtpClientBuilder, Event, Item,
LookupChannel, LookupItem, LookupResult, RemoteHost, RemoteLookup,
cache::LookupCache, imap::ImapAuthClientBuilder, smtp::SmtpClientBuilder, Event, LookupChannel,
NextHop, RemoteLookup,
};
impl Host {
@@ -46,7 +49,7 @@ impl Host {
// Prepare builders
match self.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
RemoteHost {
NextHop {
tx: self.channel_tx,
host: Arc::new(SmtpClientBuilder {
builder: mail_send::SmtpClientBuilder {
@@ -73,7 +76,7 @@ impl Host {
.await;
}
ServerProtocol::Imap => {
RemoteHost {
NextHop {
tx: self.channel_tx,
host: Arc::new(
ImapAuthClientBuilder::new(
@@ -107,7 +110,7 @@ impl Host {
}
}
impl<T: RemoteLookup> RemoteHost<T> {
impl<T: RemoteLookup> NextHop<T> {
pub async fn run(
&self,
mut rx: mpsc::Receiver<Event>,
@@ -205,32 +208,6 @@ impl From<mpsc::Sender<Event>> for LookupChannel {
}
}
impl From<LookupResult> for bool {
fn from(value: LookupResult) -> Self {
matches!(value, LookupResult::True | LookupResult::Values(_))
}
}
impl From<bool> for LookupResult {
fn from(value: bool) -> Self {
if value {
LookupResult::True
} else {
LookupResult::False
}
}
}
impl From<Vec<String>> for LookupResult {
fn from(value: Vec<String>) -> Self {
if !value.is_empty() {
LookupResult::Values(value)
} else {
LookupResult::False
}
}
}
pub trait LoggedUnwrap {
fn logged_unwrap(self) -> bool;
}
@@ -246,14 +223,3 @@ impl<T, E: std::fmt::Debug> LoggedUnwrap for Result<T, E> {
}
}
}
impl Debug for Item {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IsAccount(arg0) => f.debug_tuple("Rcpt").field(arg0).finish(),
Self::Authenticate(_) => f.debug_tuple("Auth").finish(),
Self::Expand(arg0) => f.debug_tuple("Expn").field(arg0).finish(),
Self::Verify(arg0) => f.debug_tuple("Vrfy").field(arg0).finish(),
}
}
}

View File

@@ -43,10 +43,10 @@ use crate::{
};
use super::{
lookup::ToRemoteHost,
lookup::ToNextHop,
mta_sts,
session::{read_greeting, say_helo, try_start_tls, SessionParams, StartTlsResult},
RemoteHost,
NextHop,
};
use crate::queue::{
manager::Queue, throttle, DeliveryAttempt, Domain, Error, Event, OnHold, QueueEnvelope,
@@ -160,15 +160,31 @@ impl DeliveryAttempt {
}
// Obtain next hop
let (mut remote_hosts, is_smtp) =
if let Some(next_hop) = queue_config.next_hop.eval(&envelope).await {
(
vec![RemoteHost::Relay(next_hop)],
next_hop.protocol == ServerProtocol::Smtp,
)
} else {
(Vec::with_capacity(0), true)
};
let (mut remote_hosts, is_smtp) = match queue_config.next_hop.eval(&envelope).await
{
#[cfg(feature = "local_delivery")]
Some(next_hop) if next_hop.protocol == ServerProtocol::Jmap => {
// Deliver message locally
let delivery_result = self
.message
.deliver_local(
recipients.iter_mut().filter(|r| r.domain_idx == domain_idx),
&core.delivery_tx,
&span,
)
.await;
// Update status for the current domain and continue with the next one
domain
.set_status(delivery_result, queue_config.retry.eval(&envelope).await);
continue 'next_domain;
}
Some(next_hop) => (
vec![NextHop::Relay(next_hop)],
next_hop.protocol == ServerProtocol::Smtp,
),
None => (Vec::with_capacity(0), true),
};
// Prepare TLS strategy
let mut tls_strategy = TlsStrategy {
@@ -882,7 +898,7 @@ impl DeliveryAttempt {
context = "queue",
event = "requeue",
reason = "concurrency-limited",
"Too many outbound concurrenct connections, message moved to on-hold queue."
"Too many outbound concurrent connections, message moved to on-hold queue."
);
WorkerResult::OnHold(OnHold {

View File

@@ -0,0 +1,148 @@
use smtp_proto::Response;
use tokio::sync::{mpsc, oneshot};
use utils::ipc::{DeliveryEvent, DeliveryResult, IngestMessage};
use crate::queue::{
Error, ErrorDetails, HostResponse, Message, Recipient, Status, RCPT_STATUS_CHANGED,
};
impl Message {
pub async fn deliver_local(
&self,
recipients: impl Iterator<Item = &mut Recipient>,
delivery_tx: &mpsc::Sender<DeliveryEvent>,
span: &tracing::Span,
) -> Status<(), Error> {
// Prepare recipients list
let mut total_rcpt = 0;
let mut total_completed = 0;
let mut pending_recipients = Vec::new();
let mut recipient_addresses = Vec::new();
for rcpt in recipients {
total_rcpt += 1;
if matches!(
&rcpt.status,
Status::Completed(_) | Status::PermanentFailure(_)
) {
total_completed += 1;
continue;
}
recipient_addresses.push(rcpt.address_lcase.clone());
pending_recipients.push(rcpt);
}
// Create oneshot channel
let (result_tx, result_rx) = oneshot::channel();
// Deliver message to JMAP server
let delivery_result = match delivery_tx
.send(DeliveryEvent::Ingest {
message: IngestMessage {
sender_address: self.return_path_lcase.clone(),
recipients: recipient_addresses,
message_path: self.path.clone(),
message_size: self.size,
},
result_tx,
})
.await
{
Ok(_) => {
// Wait for result
match result_rx.await {
Ok(delivery_result) => delivery_result,
Err(_) => {
tracing::warn!(
parent: span,
context = "deliver_local",
event = "error",
reason = "result channel closed",
);
return Status::local_error();
}
}
}
Err(_) => {
tracing::warn!(
parent: span,
context = "deliver_local",
event = "error",
reason = "tx channel closed",
);
return Status::local_error();
}
};
// Process delivery results
for (rcpt, result) in pending_recipients.into_iter().zip(delivery_result) {
rcpt.flags |= RCPT_STATUS_CHANGED;
match result {
DeliveryResult::Success => {
tracing::info!(
parent: span,
context = "deliver_local",
event = "delivered",
rcpt = rcpt.address,
);
rcpt.status = Status::Completed(HostResponse {
hostname: "localhost".to_string(),
response: Response {
code: 250,
esc: [2, 1, 5],
message: "OK".to_string(),
},
});
total_completed += 1;
}
DeliveryResult::TemporaryFailure { reason } => {
tracing::info!(
parent: span,
context = "deliver_local",
event = "deferred",
rcpt = rcpt.address,
reason = reason.as_ref(),
);
rcpt.status = Status::TemporaryFailure(HostResponse {
hostname: ErrorDetails {
entity: "localhost".to_string(),
details: format!("RCPT TO:<{}>", rcpt.address),
},
response: Response {
code: 451,
esc: [4, 3, 0],
message: reason.into_owned(),
},
});
}
DeliveryResult::PermanentFailure { code, reason } => {
tracing::info!(
parent: span,
context = "deliver_local",
event = "rejected",
rcpt = rcpt.address,
reason = reason.as_ref(),
);
total_completed += 1;
rcpt.status = Status::PermanentFailure(HostResponse {
hostname: ErrorDetails {
entity: "localhost".to_string(),
details: format!("RCPT TO:<{}>", rcpt.address),
},
response: Response {
code: 550,
esc: code,
message: reason.into_owned(),
},
});
}
}
}
if total_completed == total_rcpt {
Status::Completed(())
} else {
Status::Scheduled
}
}
}

View File

@@ -31,12 +31,12 @@ use crate::{
queue::{Error, ErrorDetails, Status},
};
use super::RemoteHost;
use super::NextHop;
impl SMTP {
pub(super) async fn resolve_host(
&self,
remote_host: &RemoteHost<'_>,
remote_host: &NextHop<'_>,
envelope: &impl Envelope,
max_multihomed: usize,
) -> Result<(Option<IpAddr>, Vec<IpAddr>), Status<(), Error>> {
@@ -106,20 +106,20 @@ impl SMTP {
}
}
pub(super) trait ToRemoteHost {
pub(super) trait ToNextHop {
fn to_remote_hosts<'x, 'y: 'x>(
&'x self,
domain: &'y str,
max_mx: usize,
) -> Option<Vec<RemoteHost<'_>>>;
) -> Option<Vec<NextHop<'_>>>;
}
impl ToRemoteHost for Vec<MX> {
impl ToNextHop for Vec<MX> {
fn to_remote_hosts<'x, 'y: 'x>(
&'x self,
domain: &'y str,
max_mx: usize,
) -> Option<Vec<RemoteHost<'_>>> {
) -> Option<Vec<NextHop<'_>>> {
if !self.is_empty() {
// Obtain max number of MX hosts to process
let mut remote_hosts = Vec::with_capacity(max_mx);
@@ -129,7 +129,7 @@ impl ToRemoteHost for Vec<MX> {
let mut slice = mx.exchanges.iter().collect::<Vec<_>>();
slice.shuffle(&mut rand::thread_rng());
for remote_host in slice {
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
remote_hosts.push(NextHop::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break 'outer;
}
@@ -139,7 +139,7 @@ impl ToRemoteHost for Vec<MX> {
if mx.preference == 0 && remote_host == "." {
return None;
}
remote_hosts.push(RemoteHost::MX(remote_host.as_str()));
remote_hosts.push(NextHop::MX(remote_host.as_str()));
if remote_hosts.len() == max_mx {
break;
}
@@ -149,7 +149,7 @@ impl ToRemoteHost for Vec<MX> {
} else {
// If an empty list of MXs is returned, the address is treated as if it was
// associated with an implicit MX RR with a preference of 0, pointing to that host.
vec![RemoteHost::MX(domain)].into()
vec![NextHop::MX(domain)].into()
}
}
}

View File

@@ -34,6 +34,8 @@ use crate::{
pub mod dane;
pub mod delivery;
#[cfg(feature = "local_delivery")]
pub mod local;
pub mod lookup;
pub mod mta_sts;
pub mod session;
@@ -146,6 +148,14 @@ impl Status<(), Error> {
details: format!("Timeout while {stage}"),
}))
}
#[cfg(feature = "local_delivery")]
pub fn local_error() -> Self {
Status::TemporaryFailure(Error::ConnectionError(ErrorDetails {
entity: "localhost".to_string(),
details: "Could not deliver message locally.".to_string(),
}))
}
}
impl From<mail_auth::Error> for Status<(), Error> {
@@ -221,31 +231,31 @@ impl From<Box<Message>> for DeliveryAttempt {
}
}
enum RemoteHost<'x> {
enum NextHop<'x> {
Relay(&'x RelayHost),
MX(&'x str),
}
impl<'x> RemoteHost<'x> {
impl<'x> NextHop<'x> {
#[inline(always)]
fn hostname(&self) -> &str {
match self {
RemoteHost::MX(host) => {
NextHop::MX(host) => {
if let Some(host) = host.strip_suffix('.') {
host
} else {
host
}
}
RemoteHost::Relay(host) => host.address.as_str(),
NextHop::Relay(host) => host.address.as_str(),
}
}
#[inline(always)]
fn fqdn_hostname(&self) -> Cow<'_, str> {
let host = match self {
RemoteHost::MX(host) => host,
RemoteHost::Relay(host) => host.address.as_str(),
NextHop::MX(host) => host,
NextHop::Relay(host) => host.address.as_str(),
};
if !host.ends_with('.') {
format!("{host}.").into()
@@ -258,18 +268,18 @@ impl<'x> RemoteHost<'x> {
fn port(&self) -> u16 {
match self {
#[cfg(feature = "test_mode")]
RemoteHost::MX(_) => 9925,
NextHop::MX(_) => 9925,
#[cfg(not(feature = "test_mode"))]
RemoteHost::MX(_) => 25,
RemoteHost::Relay(host) => host.port,
NextHop::MX(_) => 25,
NextHop::Relay(host) => host.port,
}
}
#[inline(always)]
fn credentials(&self) -> Option<&Credentials<String>> {
match self {
RemoteHost::MX(_) => None,
RemoteHost::Relay(host) => host.auth.as_ref(),
NextHop::MX(_) => None,
NextHop::Relay(host) => host.auth.as_ref(),
}
}
@@ -281,24 +291,24 @@ impl<'x> RemoteHost<'x> {
}
#[cfg(not(feature = "test_mode"))]
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_allow_invalid_certs,
NextHop::MX(_) => false,
NextHop::Relay(host) => host.tls_allow_invalid_certs,
}
}
#[inline(always)]
fn implicit_tls(&self) -> bool {
match self {
RemoteHost::MX(_) => false,
RemoteHost::Relay(host) => host.tls_implicit,
NextHop::MX(_) => false,
NextHop::Relay(host) => host.tls_implicit,
}
}
#[inline(always)]
fn is_smtp(&self) -> bool {
match self {
RemoteHost::MX(_) => true,
RemoteHost::Relay(host) => host.protocol == ServerProtocol::Smtp,
NextHop::MX(_) => true,
NextHop::Relay(host) => host.protocol == ServerProtocol::Smtp,
}
}
}

View File

@@ -13,6 +13,7 @@ serde = { version = "1.0", features = ["derive"]}
tracing = "0.1"
mail-auth = { git = "https://github.com/stalwartlabs/mail-auth" }
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send", default-features = false, features = ["cram-md5", "skip-ehlo"] }
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-appender = "0.2"
tracing-opentelemetry = "0.18.0"

116
crates/utils/src/ipc.rs Normal file
View File

@@ -0,0 +1,116 @@
use std::{borrow::Cow, path::PathBuf};
use mail_send::Credentials;
use tokio::{fs, io::AsyncReadExt, sync::oneshot};
pub enum DeliveryEvent {
Ingest {
message: IngestMessage,
result_tx: oneshot::Sender<Vec<DeliveryResult>>,
},
Lookup(LookupItem),
Stop,
}
pub struct IngestMessage {
pub sender_address: String,
pub recipients: Vec<String>,
pub message_path: PathBuf,
pub message_size: usize,
}
#[derive(Debug, Clone)]
pub enum DeliveryResult {
Success,
TemporaryFailure {
reason: Cow<'static, str>,
},
PermanentFailure {
code: [u8; 3],
reason: Cow<'static, str>,
},
}
#[derive(Clone, PartialEq, Eq, Hash)]
pub enum Item {
IsAccount(String),
Authenticate(Credentials<String>),
Verify(String),
Expand(String),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LookupResult {
True,
False,
Values(Vec<String>),
}
#[derive(Debug)]
pub struct LookupItem {
pub item: Item,
pub result: oneshot::Sender<LookupResult>,
}
impl From<LookupResult> for bool {
fn from(value: LookupResult) -> Self {
matches!(value, LookupResult::True | LookupResult::Values(_))
}
}
impl From<bool> for LookupResult {
fn from(value: bool) -> Self {
if value {
LookupResult::True
} else {
LookupResult::False
}
}
}
impl From<Vec<String>> for LookupResult {
fn from(value: Vec<String>) -> Self {
if !value.is_empty() {
LookupResult::Values(value)
} else {
LookupResult::False
}
}
}
impl core::fmt::Debug for Item {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::IsAccount(arg0) => f.debug_tuple("Rcpt").field(arg0).finish(),
Self::Authenticate(_) => f.debug_tuple("Auth").finish(),
Self::Expand(arg0) => f.debug_tuple("Expn").field(arg0).finish(),
Self::Verify(arg0) => f.debug_tuple("Vrfy").field(arg0).finish(),
}
}
}
impl IngestMessage {
pub async fn read_message(&self) -> Result<Vec<u8>, ()> {
let mut raw_message = vec![0u8; self.message_size];
let mut file = fs::File::open(&self.message_path).await.map_err(|err| {
tracing::error!(
context = "read_message",
event = "error",
"Failed to open message file {}: {}",
self.message_path.display(),
err
);
})?;
file.read_exact(&mut raw_message).await.map_err(|err| {
tracing::error!(
context = "read_message",
event = "error",
"Failed to read {} bytes file {} from disk: {}",
self.message_size,
self.message_path.display(),
err
);
})?;
Ok(raw_message)
}
}

View File

@@ -27,6 +27,7 @@ use config::Config;
pub mod codec;
pub mod config;
pub mod ipc;
pub mod listener;
pub mod map;

View File

@@ -8,7 +8,7 @@ resolver = "2"
store = { path = "../crates/store", features = ["test_mode"] }
jmap = { path = "../crates/jmap", features = ["test_mode"] }
jmap_proto = { path = "../crates/jmap-proto" }
smtp = { path = "../crates/smtp", features = ["test_mode"] }
smtp = { path = "../crates/smtp", features = ["test_mode", "local_delivery"] }
smtp-proto = { git = "https://github.com/stalwartlabs/smtp-proto" }
mail-send = { git = "https://github.com/stalwartlabs/mail-send" }
mail-auth = { git = "https://github.com/stalwartlabs/mail-auth", features = ["test"] }

426
tests/src/jmap/delivery.rs Normal file
View File

@@ -0,0 +1,426 @@
use std::{sync::Arc, time::Duration};
use jmap::JMAP;
use jmap_client::client::Client;
use jmap_proto::types::{collection::Collection, id::Id};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, Lines, ReadHalf, WriteHalf},
net::TcpStream,
};
use crate::jmap::{
mailbox::destroy_all_mailboxes, test_account_create, test_alias_create, test_alias_remove,
};
pub async fn test(server: Arc<JMAP>, client: &mut Client) {
println!("Running message delivery tests...");
// Create a domain name and a test account
let account_id_1 = test_account_create(&server, "jdoe@example.com", "12345", "John Doe")
.await
.to_string();
let account_id_2 = test_account_create(&server, "jane@example.com", "abcdef", "Jane Smith")
.await
.to_string();
let account_id_3 = test_account_create(&server, "bill@example.com", "12345", "Bill Foobar")
.await
.to_string();
test_alias_create(&server, "jdoe@example.com", "john.doe@example.com").await;
// Create a mailing list
test_alias_create(&server, "jdoe@example.com", "members@example.com").await;
test_alias_create(&server, "jane@example.com", "members@example.com").await;
test_alias_create(&server, "bill@example.com", "members@example.com").await;
// Delivering to individuals
let mut lmtp = SmtpConnection::connect().await;
lmtp.ingest(
"bill@example.com",
&["jdoe@example.com"],
concat!(
"From: bill@example.com\r\n",
"To: jdoe@example.com\r\n",
"Subject: TPS Report\r\n",
"\r\n",
"I'm going to need those TPS reports ASAP. ",
"So, if you could do that, that'd be great."
),
)
.await;
assert_eq!(
server
.get_document_ids(
Id::from_bytes(account_id_1.as_bytes())
.unwrap()
.document_id(),
Collection::Email
)
.await
.unwrap()
.unwrap()
.len(),
1
);
// Delivering to individuals' aliases
lmtp.ingest(
"bill@example.com",
&["john.doe@example.com"],
concat!(
"From: bill@example.com\r\n",
"To: john.doe@example.com\r\n",
"Subject: Fwd: TPS Report\r\n",
"\r\n",
"--- Forwarded Message ---\r\n\r\n ",
"I'm going to need those TPS reports ASAP. ",
"So, if you could do that, that'd be great."
),
)
.await;
assert_eq!(
server
.get_document_ids(
Id::from_bytes(account_id_1.as_bytes())
.unwrap()
.document_id(),
Collection::Email
)
.await
.unwrap()
.unwrap()
.len(),
2
);
// EXPN and VRFY
lmtp.expn("members@example.com", 2)
.await
.assert_contains("jdoe@example.com")
.assert_contains("jane@example.com")
.assert_contains("bill@example.com");
lmtp.expn("non_existant@example.com", 5).await;
lmtp.expn("jdoe@example.com", 5).await;
lmtp.vrfy("jdoe@example.com", 2).await;
lmtp.vrfy("members@example.com", 2).await;
lmtp.vrfy("non_existant@example.com", 5).await;
// Delivering to a mailing list
lmtp.ingest(
"bill@example.com",
&["members@example.com"],
concat!(
"From: bill@example.com\r\n",
"To: members@example.com\r\n",
"Subject: WFH policy\r\n",
"\r\n",
"We need the entire staff back in the office, ",
"TPS reports cannot be filed properly from home."
),
)
.await;
for (account_id, num_messages) in [(&account_id_1, 3), (&account_id_2, 1), (&account_id_3, 1)] {
assert_eq!(
server
.get_document_ids(
Id::from_bytes(account_id.as_bytes()).unwrap().document_id(),
Collection::Email
)
.await
.unwrap()
.unwrap()
.len(),
num_messages,
"for {}",
account_id
);
}
// Removing members from the mailing list and chunked ingest
test_alias_remove(&server, "jdoe@example.com", "members@example.com").await;
lmtp.ingest_chunked(
"bill@example.com",
&["members@example.com"],
concat!(
"From: bill@example.com\r\n",
"To: members@example.com\r\n",
"Subject: WFH policy (reminder)\r\n",
"\r\n",
"This is a reminder that we need the entire staff back in the office, ",
"TPS reports cannot be filed properly from home."
),
10,
)
.await;
for (account_id, num_messages) in [(&account_id_1, 3), (&account_id_2, 2), (&account_id_3, 2)] {
assert_eq!(
server
.get_document_ids(
Id::from_bytes(account_id.as_bytes()).unwrap().document_id(),
Collection::Email
)
.await
.unwrap()
.unwrap()
.len(),
num_messages,
"for {}",
account_id
);
}
// Deduplication of recipients
lmtp.ingest(
"bill@example.com",
&[
"members@example.com",
"jdoe@example.com",
"john.doe@example.com",
"jane@example.com",
"bill@example.com",
],
concat!(
"From: bill@example.com\r\n",
"Bcc: Undisclosed recipients;\r\n",
"Subject: Holidays\r\n",
"\r\n",
"Remember to file your TPS reports before ",
"going on holidays."
),
)
.await;
for (account_id, num_messages) in [(&account_id_1, 4), (&account_id_2, 3), (&account_id_3, 3)] {
assert_eq!(
server
.get_document_ids(
Id::from_bytes(account_id.as_bytes()).unwrap().document_id(),
Collection::Email
)
.await
.unwrap()
.unwrap()
.len(),
num_messages,
"for {}",
account_id
);
}
// Size checks
lmtp.send("MAIL FROM:<hello@world> SIZE=943718400").await;
lmtp.read(1, 5).await;
lmtp.send("BDAT 943718400").await;
lmtp.read(1, 5).await;
// Remove test data
for account_id in [&account_id_1, &account_id_2, &account_id_3] {
client.set_default_account_id(account_id);
destroy_all_mailboxes(client).await;
}
server.store.assert_is_empty().await;
}
pub struct SmtpConnection {
reader: Lines<BufReader<ReadHalf<TcpStream>>>,
writer: WriteHalf<TcpStream>,
}
impl SmtpConnection {
pub async fn ingest_with_code(
&mut self,
from: &str,
recipients: &[&str],
message: &str,
code: u8,
) -> Vec<String> {
self.mail_from(from, 2).await;
for recipient in recipients {
self.rcpt_to(recipient, 2).await;
}
self.data(3).await;
self.data_bytes(message, recipients.len(), code).await
}
pub async fn ingest(&mut self, from: &str, recipients: &[&str], message: &str) {
self.ingest_with_code(from, recipients, message, 2).await;
}
pub async fn ingest_chunked(
&mut self,
from: &str,
recipients: &[&str],
message: &str,
chunk_size: usize,
) {
self.mail_from(from, 2).await;
for recipient in recipients {
self.rcpt_to(recipient, 2).await;
}
for chunk in message.as_bytes().chunks(chunk_size) {
self.bdat(std::str::from_utf8(chunk).unwrap(), 2).await;
}
self.bdat_last("", recipients.len(), 2).await;
}
pub async fn connect() -> Self {
let (reader, writer) =
tokio::io::split(TcpStream::connect("127.0.0.1:11200").await.unwrap());
let mut conn = SmtpConnection {
reader: BufReader::new(reader).lines(),
writer,
};
conn.read(1, 2).await;
conn
}
pub async fn lhlo(&mut self) -> Vec<String> {
self.send("LHLO localhost").await;
self.read(1, 2).await
}
pub async fn mail_from(&mut self, sender: &str, code: u8) -> Vec<String> {
self.send(&format!("MAIL FROM:<{}>", sender)).await;
self.read(1, code).await
}
pub async fn rcpt_to(&mut self, rcpt: &str, code: u8) -> Vec<String> {
self.send(&format!("RCPT TO:<{}>", rcpt)).await;
self.read(1, code).await
}
pub async fn vrfy(&mut self, rcpt: &str, code: u8) -> Vec<String> {
self.send(&format!("VRFY {}", rcpt)).await;
self.read(1, code).await
}
pub async fn expn(&mut self, rcpt: &str, code: u8) -> Vec<String> {
self.send(&format!("EXPN {}", rcpt)).await;
self.read(1, code).await
}
pub async fn data(&mut self, code: u8) -> Vec<String> {
self.send("DATA").await;
self.read(1, code).await
}
pub async fn data_bytes(
&mut self,
message: &str,
num_responses: usize,
code: u8,
) -> Vec<String> {
self.send_raw(message).await;
self.send_raw("\r\n.\r\n").await;
self.read(num_responses, code).await
}
pub async fn bdat(&mut self, chunk: &str, code: u8) -> Vec<String> {
self.send_raw(&format!("BDAT {}\r\n{}", chunk.len(), chunk))
.await;
self.read(1, code).await
}
pub async fn bdat_last(&mut self, chunk: &str, num_responses: usize, code: u8) -> Vec<String> {
self.send_raw(&format!("BDAT {} LAST\r\n{}", chunk.len(), chunk))
.await;
self.read(num_responses, code).await
}
pub async fn rset(&mut self) -> Vec<String> {
self.send("RSET").await;
self.read(1, 2).await
}
pub async fn noop(&mut self) -> Vec<String> {
self.send("NOOP").await;
self.read(1, 2).await
}
pub async fn quit(&mut self) -> Vec<String> {
self.send("QUIT").await;
self.read(1, 2).await
}
pub async fn read(&mut self, mut num_responses: usize, code: u8) -> Vec<String> {
let mut lines = Vec::new();
loop {
match tokio::time::timeout(Duration::from_millis(1500), self.reader.next_line()).await {
Ok(Ok(Some(line))) => {
let is_done = line.as_bytes()[3] == b' ';
println!("<- {:?}", line);
lines.push(line);
if is_done {
num_responses -= 1;
if num_responses != 0 {
continue;
}
if code != u8::MAX {
for line in &lines {
if line.as_bytes()[0] - b'0' != code {
panic!("Expected completion code {}, got {:?}.", code, lines);
}
}
}
return lines;
}
}
Ok(Ok(None)) => {
panic!("Invalid response: {:?}.", lines);
}
Ok(Err(err)) => {
panic!("Connection broken: {} ({:?})", err, lines);
}
Err(_) => panic!("Timeout while waiting for server response: {:?}", lines),
}
}
}
pub async fn send(&mut self, text: &str) {
println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
self.writer.write_all(b"\r\n").await.unwrap();
}
pub async fn send_raw(&mut self, text: &str) {
println!("-> {:?}", text);
self.writer.write_all(text.as_bytes()).await.unwrap();
}
}
pub trait AssertResult: Sized {
fn assert_contains(self, text: &str) -> Self;
fn assert_count(self, text: &str, occurences: usize) -> Self;
fn assert_equals(self, text: &str) -> Self;
}
impl AssertResult for Vec<String> {
fn assert_contains(self, text: &str) -> Self {
for line in &self {
if line.contains(text) {
return self;
}
}
panic!("Expected response to contain {:?}, got {:?}", text, self);
}
fn assert_count(self, text: &str, occurences: usize) -> Self {
assert_eq!(
self.iter().filter(|l| l.contains(text)).count(),
occurences,
"Expected {} occurrences of {:?}, found {}.",
occurences,
text,
self.iter().filter(|l| l.contains(text)).count()
);
self
}
fn assert_equals(self, text: &str) -> Self {
for line in &self {
if line == text {
return self;
}
}
panic!("Expected response to be {:?}, got {:?}", text, self);
}
}

View File

@@ -1,15 +1,18 @@
use std::{sync::Arc, time::Duration};
use jmap::{api::JmapSessionManager, JMAP};
use jmap::{api::JmapSessionManager, services::IPC_CHANNEL_BUFFER, JMAP};
use jmap_client::client::{Client, Credentials};
use jmap_proto::types::id::Id;
use tokio::sync::watch;
use smtp::core::{SmtpSessionManager, SMTP};
use tokio::sync::{mpsc, watch};
use utils::config::ServerProtocol;
use crate::{add_test_certs, store::TempDir};
pub mod auth_acl;
pub mod auth_limits;
pub mod auth_oauth;
pub mod delivery;
pub mod email_changes;
pub mod email_copy;
pub mod email_get;
@@ -24,31 +27,37 @@ pub mod push_subscription;
pub mod thread_get;
pub mod thread_merge;
const SERVER: &str = "
const SERVER: &str = r#"
[server]
hostname = 'jmap.example.org'
hostname = "jmap.example.org"
[server.listener.jmap]
bind = ['127.0.0.1:8899']
url = 'https://127.0.0.1:8899'
protocol = 'jmap'
bind = ["127.0.0.1:8899"]
url = "https://127.0.0.1:8899"
protocol = "jmap"
max-connections = 512
[server.listener.lmtp-debug]
bind = ['127.0.0.1:11200']
greeting = 'Test LMTP instance'
protocol = 'lmtp'
tls.implicit = true
[server.socket]
reuse-addr = true
[server.tls]
enable = true
implicit = false
certificate = 'default'
certificate = "default"
[store]
db.path = '{TMP}/sqlite.db'
blob.path = '{TMP}'
db.path = "{TMP}/sqlite.db"
blob.path = "{TMP}"
[certificate.default]
cert = 'file://{CERT}'
private-key = 'file://{PK}'
cert = "file://{CERT}"
private-key = "file://{PK}"
[jmap.protocol]
set.max-objects = 100000
@@ -61,40 +70,44 @@ max-size = 5000000
max-concurrent = 4
[jmap.rate-limit]
account.rate = '100/1m'
authentication.rate = '100/1m'
anonymous.rate = '1000/1m'
account.rate = "100/1m"
authentication.rate = "100/1m"
anonymous.rate = "1000/1m"
[jmap.event-source]
throttle = '500ms'
throttle = "500ms"
[jmap.web-sockets]
throttle = '500ms'
throttle = "500ms"
[jmap.push]
throttle = '500ms'
attempts.interval = '500ms'
throttle = "500ms"
attempts.interval = "500ms"
[jmap.auth.database]
type = 'sql'
address = 'sqlite::memory:'
type = "sql"
address = "sqlite::memory:"
[jmap.auth.database.query]
uid-by-login = 'SELECT ROWID - 1 FROM users WHERE login = ?'
login-by-uid = 'SELECT login FROM users WHERE ROWID - 1 = ?'
secret-by-uid = 'SELECT secret FROM users WHERE ROWID - 1 = ?'
gids-by-uid = 'SELECT gid FROM groups WHERE uid = ?'
uid-by-login = "SELECT ROWID - 1 FROM users WHERE login = ?"
login-by-uid = "SELECT login FROM users WHERE ROWID - 1 = ?"
secret-by-uid = "SELECT secret FROM users WHERE ROWID - 1 = ?"
gids-by-uid = "SELECT gid FROM groups WHERE uid = ?"
uids-by-address = "SELECT uid FROM emails WHERE address = ?"
addresses-by-uid = "SELECT address FROM emails WHERE uid = ?"
vrfy = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' LIMIT 5"
expn = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' LIMIT 5"
[oauth]
key = 'parerga_und_paralipomena'
key = "parerga_und_paralipomena"
max-auth-attempts = 1
[oauth.expiry]
user-code = '1s'
token = '1s'
refresh-token = '3s'
refresh-token-renew = '2s'
";
user-code = "1s"
token = "1s"
refresh-token = "3s"
refresh-token-renew = "2s"
"#;
#[tokio::test]
pub async fn jmap_tests() {
@@ -118,11 +131,12 @@ pub async fn jmap_tests() {
//thread_get::test(params.server.clone(), &mut params.client).await;
//thread_merge::test(params.server.clone(), &mut params.client).await;
//mailbox::test(params.server.clone(), &mut params.client).await;
delivery::test(params.server.clone(), &mut params.client).await;
//auth_acl::test(params.server.clone(), &mut params.client).await;
//auth_limits::test(params.server.clone(), &mut params.client).await;
//auth_oauth::test(params.server.clone(), &mut params.client).await;
//event_source::test(params.server.clone(), &mut params.client).await;
push_subscription::test(params.server.clone(), &mut params.client).await;
//push_subscription::test(params.server.clone(), &mut params.client).await;
if delete {
params.temp_dir.delete();
@@ -140,17 +154,27 @@ struct JMAPTest {
async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
// Load and parse config
let temp_dir = TempDir::new("jmap_tests", delete_if_exists);
let settings = utils::config::Config::parse(
let config = utils::config::Config::parse(
&add_test_certs(SERVER).replace("{TMP}", &temp_dir.path.display().to_string()),
)
.unwrap();
let servers = settings.parse_servers().unwrap();
let servers = config.parse_servers().unwrap();
// Start JMAP server
servers.bind(&settings);
let manager = JmapSessionManager::new(JMAP::init(&settings).await);
// Start JMAP and SMTP servers
servers.bind(&config);
let (delivery_tx, delivery_rx) = mpsc::channel(IPC_CHANNEL_BUFFER);
let smtp = SMTP::init(&config, &servers, delivery_tx).await;
let jmap = JMAP::init(&config, delivery_rx).await;
let shutdown_tx = servers.spawn(|server, shutdown_rx| {
server.spawn(manager.clone(), shutdown_rx);
match &server.protocol {
ServerProtocol::Smtp | ServerProtocol::Lmtp => {
server.spawn(SmtpSessionManager::new(smtp.clone()), shutdown_rx)
}
ServerProtocol::Jmap => {
server.spawn(JmapSessionManager::new(jmap.clone()), shutdown_rx)
}
_ => unreachable!(),
};
});
// Create tables
@@ -161,9 +185,7 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
"INSERT INTO users (login, secret) VALUES ('admin', 'secret')", // RowID 0 is admin
] {
assert!(
manager
.inner
.auth_db
jmap.auth_db
.execute(query, Vec::<String>::new().into_iter())
.await,
"failed for {query}"
@@ -181,7 +203,7 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
client.set_default_account_id(Id::new(1));
JMAPTest {
server: manager.inner,
server: jmap,
temp_dir,
client,
shutdown_tx,
@@ -253,7 +275,46 @@ pub async fn test_account_create(jmap: &JMAP, login: &str, secret: &str, name: &
)
.await
);
Id::new(jmap.get_account_id(login).await.unwrap() as u64)
let uid = jmap.get_account_id(login).await.unwrap() as u64;
assert!(
jmap.auth_db
.execute(
&format!(
"INSERT OR REPLACE INTO emails (uid, email) VALUES ({}, ?)",
uid
),
vec![login.to_string()].into_iter()
)
.await
);
Id::new(uid)
}
pub async fn test_alias_create(jmap: &JMAP, login: &str, alias: &str) {
let uid = jmap.get_account_id(login).await.unwrap() as u64;
assert!(
jmap.auth_db
.execute(
&format!(
"INSERT OR REPLACE INTO emails (uid, email) VALUES ({}, ?)",
uid
),
vec![alias.to_string()].into_iter()
)
.await
);
}
pub async fn test_alias_remove(jmap: &JMAP, login: &str, alias: &str) {
let uid = jmap.get_account_id(login).await.unwrap() as u64;
assert!(
jmap.auth_db
.execute(
&format!("DELETE FROM emails WHERE uid = {} AND email = ?", uid),
vec![alias.to_string()].into_iter()
)
.await
);
}
pub async fn test_account_login(login: &str, secret: &str) -> Client {

View File

@@ -42,7 +42,7 @@ async fn auth() {
let mut ctx = ConfigContext::new(&[]);
ctx.lookup.insert(
"plain".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"john:secret".to_string(),
"jane:p4ssw0rd".to_string(),
]))),

View File

@@ -44,12 +44,12 @@ async fn data() {
let mut qr = core.init_test_queue("smtp_data_test");
let mut config = &mut core.session.config.rcpt;
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"foobar.org".to_string(),
"domain.net".to_string(),
"test.com".to_string(),
])))));
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"bill@foobar.org".to_string(),
"john@foobar.org".to_string(),
"jane@domain.net".to_string(),

View File

@@ -121,10 +121,10 @@ async fn dmarc() {
let mut rr = core.init_test_report();
let mut config = &mut core.session.config.rcpt;
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"example.com".to_string(),
])))));
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"jdoe@example.com".to_string(),
])))));

View File

@@ -40,13 +40,13 @@ use smtp::{
async fn rcpt() {
let mut core = SMTP::test();
let list_addresses = Lookup::Local(AHashSet::from_iter([
let list_addresses = Lookup::List(AHashSet::from_iter([
"jane@foobar.org".to_string(),
"bill@foobar.org".to_string(),
"mike@foobar.org".to_string(),
"john@foobar.org".to_string(),
]));
let list_domains = Lookup::Local(AHashSet::from_iter(["foobar.org".to_string()]));
let list_domains = Lookup::List(AHashSet::from_iter(["foobar.org".to_string()]));
let mut config = &mut core.session.config.rcpt;
let mut config_ext = &mut core.session.config.extensions;

View File

@@ -141,10 +141,10 @@ async fn sign_and_seal() {
);
let mut config = &mut core.session.config.rcpt;
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_domains = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"example.com".to_string(),
])))));
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::Local(AHashSet::from_iter([
config.lookup_addresses = IfBlock::new(Some(Arc::new(Lookup::List(AHashSet::from_iter([
"jdoe@example.com".to_string(),
])))));

View File

@@ -41,14 +41,14 @@ async fn vrfy_expn() {
let mut ctx = ConfigContext::new(&[]);
ctx.lookup.insert(
"vrfy".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"john@foobar.org:john@foobar.org".to_string(),
"john:john@foobar.org".to_string(),
]))),
);
ctx.lookup.insert(
"expn".to_string(),
Arc::new(Lookup::Local(AHashSet::from_iter([
Arc::new(Lookup::List(AHashSet::from_iter([
"sales:john@foobar.org,bill@foobar.org,jane@foobar.org".to_string(),
"support:mike@foobar.org".to_string(),
]))),

View File

@@ -32,12 +32,10 @@ use tokio::{
};
use tokio_rustls::TlsAcceptor;
use smtp::{
config::{remote::ConfigHost, ConfigContext},
lookup::{Item, LookupResult},
};
use smtp::config::{remote::ConfigHost, ConfigContext};
use utils::{
config::Config,
ipc::{Item, LookupResult},
listener::limiter::{ConcurrencyLimiter, InFlight},
};

View File

@@ -27,8 +27,7 @@ use mail_send::Credentials;
use rustls::{Certificate, PrivateKey, ServerConfig};
use rustls_pemfile::{certs, pkcs8_private_keys};
use tokio_rustls::TlsAcceptor;
use ::smtp::lookup::{Item, LookupResult};
use utils::ipc::{Item, LookupResult};
pub mod imap;
pub mod smtp;

View File

@@ -32,12 +32,10 @@ use tokio::{
};
use tokio_rustls::TlsAcceptor;
use smtp::{
config::{remote::ConfigHost, ConfigContext},
lookup::{Item, LookupResult},
};
use smtp::config::{remote::ConfigHost, ConfigContext};
use utils::{
config::Config,
ipc::{Item, LookupResult},
listener::limiter::{ConcurrencyLimiter, InFlight},
};

View File

@@ -2,9 +2,9 @@ use std::time::{Duration, Instant};
use mail_auth::{IpLookupStrategy, MX};
use smtp::{config::IfBlock, core::SMTP, outbound::RemoteHost};
use smtp::{config::IfBlock, core::SMTP, outbound::NextHop};
use super::ToRemoteHost;
use super::ToNextHop;
#[tokio::test]
async fn lookup_ip() {
@@ -40,7 +40,7 @@ async fn lookup_ip() {
// Ipv4 strategy
core.queue.config.ip_strategy = IfBlock::new(IpLookupStrategy::Ipv4thenIpv6);
let (source_ips, remote_ips) = core
.resolve_host(&RemoteHost::MX("mx.foobar.org"), &"envelope", 2)
.resolve_host(&NextHop::MX("mx.foobar.org"), &"envelope", 2)
.await
.unwrap();
assert!(ipv4.contains(&match source_ips.unwrap() {
@@ -52,7 +52,7 @@ async fn lookup_ip() {
// Ipv6 strategy
core.queue.config.ip_strategy = IfBlock::new(IpLookupStrategy::Ipv6thenIpv4);
let (source_ips, remote_ips) = core
.resolve_host(&RemoteHost::MX("mx.foobar.org"), &"envelope", 2)
.resolve_host(&NextHop::MX("mx.foobar.org"), &"envelope", 2)
.await
.unwrap();
assert!(ipv6.contains(&match source_ips.unwrap() {
@@ -90,7 +90,7 @@ fn to_remote_hosts() {
let hosts = mx.to_remote_hosts("domain", 7).unwrap();
assert_eq!(hosts.len(), 7);
for host in hosts {
if let RemoteHost::MX(host) = host {
if let NextHop::MX(host) = host {
assert!((*host.as_bytes().last().unwrap() - b'0') <= 8);
}
}

View File

@@ -87,7 +87,7 @@ async fn manage_queue() {
core.queue.config.retry = IfBlock::new(vec![Duration::from_secs(1000)]);
core.queue.config.notify = IfBlock::new(vec![Duration::from_secs(2000)]);
core.queue.config.expire = IfBlock::new(Duration::from_secs(3000));
core.queue.config.management_lookup = Arc::new(Lookup::Local(AHashSet::from_iter([
core.queue.config.management_lookup = Arc::new(Lookup::List(AHashSet::from_iter([
"admin:secret".to_string(),
])));
let local_qr = core.init_test_queue("smtp_manage_queue_local");

View File

@@ -67,7 +67,7 @@ async fn manage_reports() {
config.hash = IfBlock::new(16);
config.dmarc_aggregate.max_size = IfBlock::new(1024);
config.tls.max_size = IfBlock::new(1024);
core.queue.config.management_lookup = Arc::new(Lookup::Local(AHashSet::from_iter([
core.queue.config.management_lookup = Arc::new(Lookup::List(AHashSet::from_iter([
"admin:secret".to_string(),
])));
let (report_tx, report_rx) = mpsc::channel(1024);

View File

@@ -158,6 +158,7 @@ impl TestConfig for SMTP {
mail_auth: MailAuthConfig::test(),
report: ReportCore::test(),
sieve: SieveCore::test(),
delivery_tx: mpsc::channel(1).0,
}
}
}
@@ -313,7 +314,7 @@ impl TestConfig for QueueConfig {
rcpt: vec![],
rcpt_domain: vec![],
},
management_lookup: Arc::new(Lookup::Local(AHashSet::default())),
management_lookup: Arc::new(Lookup::List(AHashSet::default())),
}
}
}