Database schema optimization - part 10

This commit is contained in:
mdecimus
2025-11-11 18:47:29 +01:00
parent 836bc5b7fd
commit 3e181ae467
71 changed files with 1134 additions and 773 deletions

345
Cargo.lock generated
View File

@@ -378,18 +378,17 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0"
[[package]]
name = "attohttpc"
version = "0.30.1"
version = "0.28.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16e2cdb6d5ed835199484bb92bb8b3edd526effe995c61732580439c1a67e2e9"
checksum = "07a9b245ba0739fc90935094c29adbaee3f977218b5fb95e822e261cda7f56a3"
dependencies = [
"base64 0.22.1",
"http 1.3.1",
"log",
"rustls 0.23.35",
"serde",
"serde_json",
"url",
"webpki-roots 1.0.4",
"webpki-roots 0.26.11",
]
[[package]]
@@ -400,51 +399,28 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8"
[[package]]
name = "aws-creds"
version = "0.39.0"
version = "0.37.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13804829a843b3f26e151c97acbb315ee1177a2724690edfcd28f1894146200"
checksum = "7f84143206b9c72b3c5cb65415de60c7539c79cd1559290fddec657939131be0"
dependencies = [
"attohttpc",
"home",
"log",
"quick-xml 0.38.3",
"quick-xml 0.32.0",
"rust-ini",
"serde",
"thiserror 2.0.17",
"thiserror 1.0.69",
"time",
"url",
]
[[package]]
name = "aws-lc-rs"
version = "1.14.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "879b6c89592deb404ba4dc0ae6b58ffd1795c78991cbb5b8bc441c48a070440d"
dependencies = [
"aws-lc-sys",
"zeroize",
]
[[package]]
name = "aws-lc-sys"
version = "0.32.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "107a4e9d9cab9963e04e84bb8dee0e25f2a987f9a8bad5ed054abd439caa8f8c"
dependencies = [
"bindgen 0.72.1",
"cc",
"cmake",
"dunce",
"fs_extra",
]
[[package]]
name = "aws-region"
version = "0.28.0"
version = "0.25.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5532f65342f789f9c1b7078ea9c9cd9293cd62dcc284fa99adc4a1c9ba43469c"
checksum = "e9aed3f9c7eac9be28662fdb3b0f4d1951e812f7c64fed4f0327ba702f459b3b"
dependencies = [
"thiserror 2.0.17",
"thiserror 1.0.69",
]
[[package]]
@@ -643,8 +619,6 @@ dependencies = [
"cexpr",
"clang-sys",
"itertools 0.13.0",
"log",
"prettyplease",
"proc-macro2",
"quote",
"regex",
@@ -1190,7 +1164,7 @@ dependencies = [
"futures",
"hashify",
"hostname",
"hyper",
"hyper 1.7.0",
"idna",
"imagesize",
"imap_proto",
@@ -1202,7 +1176,7 @@ dependencies = [
"mail-builder",
"mail-parser",
"mail-send",
"md5",
"md5 0.8.0",
"nlp",
"num_cpus",
"opentelemetry",
@@ -1673,7 +1647,7 @@ dependencies = [
"groupware",
"hashify",
"http_proto",
"hyper",
"hyper 1.7.0",
"percent-encoding",
"rkyv",
"store",
@@ -1690,7 +1664,7 @@ dependencies = [
"chrono",
"compact_str",
"hashify",
"hyper",
"hyper 1.7.0",
"mail-parser",
"quick-xml 0.38.3",
"rkyv",
@@ -1863,7 +1837,7 @@ dependencies = [
"mail-builder",
"mail-parser",
"mail-send",
"md5",
"md5 0.8.0",
"nlp",
"password-hash",
"pbkdf2",
@@ -1988,12 +1962,6 @@ dependencies = [
"zeroize",
]
[[package]]
name = "dunce"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92773504d58c093f6de2459af4af33faa518c13451eb8f2b5698ed3d36e7c813"
[[package]]
name = "dyn-clone"
version = "1.0.20"
@@ -2489,12 +2457,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "funty"
version = "2.0.0"
@@ -3048,7 +3010,7 @@ dependencies = [
"groupware",
"http-body-util",
"http_proto",
"hyper",
"hyper 1.7.0",
"hyper-util",
"jmap",
"jmap_proto",
@@ -3089,6 +3051,17 @@ dependencies = [
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2"
dependencies = [
"bytes",
"http 0.2.12",
"pin-project-lite",
]
[[package]]
name = "http-body"
version = "1.0.1"
@@ -3108,7 +3081,7 @@ dependencies = [
"bytes",
"futures-core",
"http 1.3.1",
"http-body",
"http-body 1.0.1",
"pin-project-lite",
]
@@ -3140,7 +3113,7 @@ dependencies = [
"compact_str",
"form_urlencoded",
"http-body-util",
"hyper",
"hyper 1.7.0",
"hyper-util",
"percent-encoding",
"serde",
@@ -3172,6 +3145,29 @@ version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "135b12329e5e3ce057a9f972339ea52bc954fe1e9358ef27f95e89716fbc5424"
[[package]]
name = "hyper"
version = "0.14.32"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7"
dependencies = [
"bytes",
"futures-channel",
"futures-core",
"futures-util",
"http 0.2.12",
"http-body 0.4.6",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]]
name = "hyper"
version = "1.7.0"
@@ -3184,7 +3180,7 @@ dependencies = [
"futures-core",
"h2 0.4.12",
"http 1.3.1",
"http-body",
"http-body 1.0.1",
"httparse",
"httpdate",
"itoa",
@@ -3195,6 +3191,20 @@ dependencies = [
"want",
]
[[package]]
name = "hyper-rustls"
version = "0.24.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590"
dependencies = [
"futures-util",
"http 0.2.12",
"hyper 0.14.32",
"rustls 0.21.12",
"tokio",
"tokio-rustls 0.24.1",
]
[[package]]
name = "hyper-rustls"
version = "0.27.7"
@@ -3202,7 +3212,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58"
dependencies = [
"http 1.3.1",
"hyper",
"hyper 1.7.0",
"hyper-util",
"rustls 0.23.35",
"rustls-pki-types",
@@ -3218,7 +3228,7 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0"
dependencies = [
"hyper",
"hyper 1.7.0",
"hyper-util",
"pin-project-lite",
"tokio",
@@ -3237,8 +3247,8 @@ dependencies = [
"futures-core",
"futures-util",
"http 1.3.1",
"http-body",
"hyper",
"http-body 1.0.1",
"hyper 1.7.0",
"ipnet",
"libc",
"percent-encoding",
@@ -3261,7 +3271,7 @@ dependencies = [
"js-sys",
"log",
"wasm-bindgen",
"windows-core 0.62.2",
"windows-core",
]
[[package]]
@@ -3409,7 +3419,7 @@ dependencies = [
"indexmap 2.12.0",
"mail-parser",
"mail-send",
"md5",
"md5 0.8.0",
"nlp",
"parking_lot",
"rand 0.9.2",
@@ -3703,7 +3713,7 @@ dependencies = [
"hkdf",
"http-body-util",
"http_proto",
"hyper",
"hyper 1.7.0",
"hyper-util",
"jmap-tools",
"jmap_proto",
@@ -4235,7 +4245,7 @@ checksum = "114a4e27f3cfaf8918783e8fa4149b820c813b1bedc7755e20e12eff4518331e"
dependencies = [
"base64 0.22.1",
"gethostname",
"md5",
"md5 0.8.0",
"rustls 0.23.35",
"rustls-pki-types",
"smtp-proto",
@@ -4258,7 +4268,7 @@ dependencies = [
"jmap_proto",
"mail-parser",
"mail-send",
"md5",
"md5 0.8.0",
"parking_lot",
"rkyv",
"rustls 0.23.35",
@@ -4319,6 +4329,12 @@ dependencies = [
"digest 0.10.7",
]
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "md5"
version = "0.8.0"
@@ -4648,15 +4664,6 @@ dependencies = [
"serde",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.50.3"
@@ -4775,25 +4782,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3"
[[package]]
name = "objc2-core-foundation"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a180dd8642fa45cdb7dd721cd4c11b1cadd4929ce112ebd8b9f5803cc79d536"
dependencies = [
"bitflags",
]
[[package]]
name = "objc2-io-kit"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33fafba39597d6dc1fb709123dfa8289d39406734be322956a69f0931c73bb15"
dependencies = [
"libc",
"objc2-core-foundation",
]
[[package]]
name = "object"
version = "0.37.3"
@@ -5680,6 +5668,16 @@ dependencies = [
"serde",
]
[[package]]
name = "quick-xml"
version = "0.32.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d3a6e5838b60e0e8fa7a43f22ade549a37d61f8bdbe636d0d7816191de969c2"
dependencies = [
"memchr",
"serde",
]
[[package]]
name = "quick-xml"
version = "0.38.3"
@@ -5687,7 +5685,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
dependencies = [
"memchr",
"serde",
]
[[package]]
@@ -6184,10 +6181,10 @@ dependencies = [
"futures-util",
"h2 0.4.12",
"http 1.3.1",
"http-body",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper-rustls",
"hyper 1.7.0",
"hyper-rustls 0.27.7",
"hyper-util",
"js-sys",
"log",
@@ -6415,9 +6412,9 @@ dependencies = [
[[package]]
name = "rust-s3"
version = "0.37.0"
version = "0.35.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94f9b973bd4097f5bb47e5827dcb9fb5dc17e93879e46badc27d2a4e9a4e5588"
checksum = "c3df3f353b1f4209dcf437d777cda90279c397ab15a0cd6fd06bd32c88591533"
dependencies = [
"async-trait",
"aws-creds",
@@ -6425,24 +6422,27 @@ dependencies = [
"base64 0.22.1",
"bytes",
"cfg-if",
"futures-util",
"futures",
"hex",
"hmac 0.12.1",
"http 1.3.1",
"http 0.2.12",
"hyper 0.14.32",
"hyper-rustls 0.24.2",
"log",
"maybe-async",
"md5",
"md5 0.7.0",
"percent-encoding",
"quick-xml 0.38.3",
"reqwest",
"quick-xml 0.32.0",
"rustls 0.21.12",
"rustls-native-certs 0.6.3",
"serde",
"serde_derive",
"serde_json",
"sha2 0.10.9",
"sysinfo",
"thiserror 2.0.17",
"thiserror 1.0.69",
"time",
"tokio",
"tokio-rustls 0.24.1",
"tokio-stream",
"url",
]
@@ -6524,7 +6524,6 @@ version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
"aws-lc-rs",
"log",
"once_cell",
"ring",
@@ -6534,6 +6533,18 @@ dependencies = [
"zeroize",
]
[[package]]
name = "rustls-native-certs"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [
"openssl-probe",
"rustls-pemfile 1.0.4",
"schannel",
"security-framework 2.11.1",
]
[[package]]
name = "rustls-native-certs"
version = "0.7.3"
@@ -6641,7 +6652,6 @@ version = "0.103.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffdfa2f5286e2247234e03f680868ac2815974dc39e00ea15adc445d0aafe52"
dependencies = [
"aws-lc-rs",
"ring",
"rustls-pki-types",
"untrusted",
@@ -7324,14 +7334,14 @@ dependencies = [
"email",
"form_urlencoded",
"http-body-util",
"hyper",
"hyper 1.7.0",
"hyper-util",
"lru-cache",
"mail-auth",
"mail-builder",
"mail-parser",
"mail-send",
"md5",
"md5 0.8.0",
"nlp",
"num_cpus",
"parking_lot",
@@ -7420,7 +7430,7 @@ dependencies = [
"common",
"compact_str",
"decancer",
"hyper",
"hyper 1.7.0",
"idna",
"infer 0.19.0",
"mail-auth",
@@ -7603,6 +7613,7 @@ dependencies = [
"rocksdb",
"rusqlite",
"rust-s3",
"rustls 0.21.12",
"rustls 0.23.35",
"rustls-pki-types",
"serde",
@@ -7694,20 +7705,6 @@ dependencies = [
"syn 2.0.108",
]
[[package]]
name = "sysinfo"
version = "0.37.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16607d5caffd1c07ce073528f9ed972d88db15dd44023fa57142963be3feb11f"
dependencies = [
"libc",
"memchr",
"ntapi",
"objc2-core-foundation",
"objc2-io-kit",
"windows",
]
[[package]]
name = "tagptr"
version = "0.2.0"
@@ -7766,7 +7763,7 @@ dependencies = [
"http 0.14.1",
"http-body-util",
"http_proto",
"hyper",
"hyper 1.7.0",
"hyper-util",
"imap",
"imap_proto",
@@ -7775,6 +7772,7 @@ dependencies = [
"jmap-client 0.4.0",
"jmap_proto",
"mail-auth",
"mail-builder",
"mail-parser",
"mail-send",
"managesieve",
@@ -8119,9 +8117,9 @@ dependencies = [
"base64 0.22.1",
"bytes",
"http 1.3.1",
"http-body",
"http-body 1.0.1",
"http-body-util",
"hyper",
"hyper 1.7.0",
"hyper-timeout",
"hyper-util",
"percent-encoding",
@@ -8198,7 +8196,7 @@ dependencies = [
"bytes",
"futures-util",
"http 1.3.1",
"http-body",
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower 0.5.2",
@@ -8901,41 +8899,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows"
version = "0.61.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9babd3a767a4c1aef6900409f85f5d53ce2544ccdfaa86dad48c91782c6d6893"
dependencies = [
"windows-collections",
"windows-core 0.61.2",
"windows-future",
"windows-link 0.1.3",
"windows-numerics",
]
[[package]]
name = "windows-collections"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3beeceb5e5cfd9eb1d76b381630e82c4241ccd0d27f1a39ed41b2760b255c5e8"
dependencies = [
"windows-core 0.61.2",
]
[[package]]
name = "windows-core"
version = "0.61.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3"
dependencies = [
"windows-implement",
"windows-interface",
"windows-link 0.1.3",
"windows-result 0.3.4",
"windows-strings 0.4.2",
]
[[package]]
name = "windows-core"
version = "0.62.2"
@@ -8945,19 +8908,8 @@ dependencies = [
"windows-implement",
"windows-interface",
"windows-link 0.2.1",
"windows-result 0.4.1",
"windows-strings 0.5.1",
]
[[package]]
name = "windows-future"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc6a41e98427b19fe4b73c550f060b59fa592d7d686537eebf9385621bfbad8e"
dependencies = [
"windows-core 0.61.2",
"windows-link 0.1.3",
"windows-threading",
"windows-result",
"windows-strings",
]
[[package]]
@@ -8994,25 +8946,6 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5"
[[package]]
name = "windows-numerics"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9150af68066c4c5c07ddc0ce30421554771e528bde427614c61038bc2c92c2b1"
dependencies = [
"windows-core 0.61.2",
"windows-link 0.1.3",
]
[[package]]
name = "windows-result"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
name = "windows-result"
version = "0.4.1"
@@ -9022,15 +8955,6 @@ dependencies = [
"windows-link 0.2.1",
]
[[package]]
name = "windows-strings"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
name = "windows-strings"
version = "0.5.1"
@@ -9157,15 +9081,6 @@ dependencies = [
"windows_x86_64_msvc 0.53.1",
]
[[package]]
name = "windows-threading"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b66463ad2e0ea3bbf808b7f1d371311c80e115c0b71d60efc142cafbcfb057a6"
dependencies = [
"windows-link 0.1.3",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"

View File

@@ -243,9 +243,7 @@ pub fn build_span_document(
events: Vec<Event<EventDetails>>,
index_fields: &AHashSet<SearchField>,
) -> IndexDocument {
let mut document = IndexDocument::new(SearchIndex::Tracing);
document.index_unsigned(SearchField::Id, span_id);
let mut document = IndexDocument::new(SearchIndex::Tracing).with_id(span_id);
for event in events {
for (idx, (key, value)) in event.keys.into_iter().enumerate() {

View File

@@ -553,6 +553,7 @@ async fn copy_event(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
response
}
@@ -712,6 +713,7 @@ async fn move_event(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
response
}
@@ -756,6 +758,7 @@ async fn rename_event(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED))
}
@@ -960,6 +963,7 @@ async fn copy_container(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
if !is_overwrite {
Ok(HttpResponse::new(StatusCode::CREATED))
@@ -1000,6 +1004,7 @@ async fn rename_container(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED))
}

View File

@@ -22,11 +22,15 @@ use groupware::{
};
use http_proto::HttpResponse;
use hyper::StatusCode;
use store::write::BatchBuilder;
use store::{
ValueKey,
write::{BatchBuilder, ValueClass},
};
use trc::AddContext;
use types::{
acl::Acl,
collection::{Collection, SyncCollection},
field::PrincipalField,
};
pub(crate) trait CalendarDeleteRequestHandler: Sync + Send {
@@ -124,7 +128,7 @@ impl CalendarDeleteRequestHandler for Server {
)
.await?;
// Delete addresscalendar and events
// Delete calendar and events
DestroyArchive(calendar)
.delete_with_events(
self,
@@ -142,6 +146,25 @@ impl CalendarDeleteRequestHandler for Server {
)
.await
.caused_by(trc::location!())?;
// Reset default calendar id
let default_calendar_id = self
.store()
.get_value::<u32>(ValueKey {
account_id,
collection: Collection::Principal.into(),
document_id: 0,
class: ValueClass::Property(PrincipalField::DefaultCalendarId.into()),
})
.await
.caused_by(trc::location!())?;
if default_calendar_id.is_some_and(|id| id == document_id) {
batch
.with_account_id(account_id)
.with_collection(Collection::Principal)
.with_document(0)
.clear(PrincipalField::DefaultCalendarId);
}
} else {
// Validate ACL
let calendar_id = delete_resource.parent_id().unwrap();
@@ -200,10 +223,7 @@ impl CalendarDeleteRequestHandler for Server {
}
self.commit_batch(batch).await.caused_by(trc::location!())?;
if send_itip {
self.notify_task_queue();
}
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::NO_CONTENT))
}

View File

@@ -266,8 +266,6 @@ impl CalendarUpdateRequestHandler for Server {
}
}
}
let nudge_queue = next_email_alarm.is_some() || itip_messages.is_some();
// Validate quota
let extra_bytes =
(bytes.len() as u64).saturating_sub(u32::from(event.inner.size) as u64);
@@ -300,9 +298,7 @@ impl CalendarUpdateRequestHandler for Server {
.caused_by(trc::location!())?;
}
self.commit_batch(batch).await.caused_by(trc::location!())?;
if nudge_queue {
self.notify_task_queue();
}
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::NO_CONTENT)
.with_etag_opt(etag)
@@ -400,7 +396,6 @@ impl CalendarUpdateRequestHandler for Server {
}
}
}
let nudge_queue = next_email_alarm.is_some() || itip_messages.is_some();
// Validate quota
if !bytes.is_empty() {
@@ -435,10 +430,7 @@ impl CalendarUpdateRequestHandler for Server {
.caused_by(trc::location!())?;
}
self.commit_batch(batch).await.caused_by(trc::location!())?;
if nudge_queue {
self.notify_task_queue();
}
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED)
.with_etag_opt(etag)

View File

@@ -527,6 +527,7 @@ async fn copy_card(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
response
}
@@ -669,6 +670,7 @@ async fn move_card(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
response
}
@@ -713,6 +715,7 @@ async fn rename_card(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED))
}
@@ -905,6 +908,7 @@ async fn copy_container(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
if !is_overwrite {
Ok(HttpResponse::new(StatusCode::CREATED))
@@ -945,6 +949,7 @@ async fn rename_container(
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
server.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED))
}

View File

@@ -21,11 +21,15 @@ use groupware::{
};
use http_proto::HttpResponse;
use hyper::StatusCode;
use store::write::BatchBuilder;
use store::{
ValueKey,
write::{BatchBuilder, ValueClass},
};
use trc::AddContext;
use types::{
acl::Acl,
collection::{Collection, SyncCollection},
field::PrincipalField,
};
pub(crate) trait CardDeleteRequestHandler: Sync + Send {
@@ -121,6 +125,25 @@ impl CardDeleteRequestHandler for Server {
)
.await
.caused_by(trc::location!())?;
// Reset default address book id
let default_book_id = self
.store()
.get_value::<u32>(ValueKey {
account_id,
collection: Collection::Principal.into(),
document_id: 0,
class: ValueClass::Property(PrincipalField::DefaultAddressBookId.into()),
})
.await
.caused_by(trc::location!())?;
if default_book_id.is_some_and(|id| id == document_id) {
batch
.with_account_id(account_id)
.with_collection(Collection::Principal)
.with_document(0)
.clear(PrincipalField::DefaultAddressBookId);
}
} else {
// Validate ACL
let addressbook_id = delete_resource.parent_id().unwrap();
@@ -175,6 +198,7 @@ impl CardDeleteRequestHandler for Server {
}
self.commit_batch(batch).await.caused_by(trc::location!())?;
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::NO_CONTENT))
}

View File

@@ -189,6 +189,7 @@ impl CardUpdateRequestHandler for Server {
.caused_by(trc::location!())?
.etag();
self.commit_batch(batch).await.caused_by(trc::location!())?;
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::NO_CONTENT).with_etag_opt(etag))
} else if let Some((Some(parent), name)) = resources.map_parent(resource_name.as_ref()) {
@@ -265,6 +266,7 @@ impl CardUpdateRequestHandler for Server {
.caused_by(trc::location!())?
.etag();
self.commit_batch(batch).await.caused_by(trc::location!())?;
self.notify_task_queue();
Ok(HttpResponse::new(StatusCode::CREATED).with_etag_opt(etag))
} else {

View File

@@ -6,7 +6,6 @@
use super::{PrincipalInfo, manage::ManageDirectory};
use crate::{Principal, PrincipalData, QueryBy, QueryParams, Type, backend::RcptType};
use mail_send::Credentials;
use store::{
Deserialize, IterateParams, Store, ValueKey,

View File

@@ -28,7 +28,7 @@ use store::{
use trc::AddContext;
use types::{
collection::Collection,
field::{self, Field},
field::{self},
};
use utils::{DomainPart, sanitize_email};
@@ -96,6 +96,11 @@ pub trait ManageDirectory: Sized {
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<u64>;
async fn principal_ids(
&self,
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<RoaringBitmap>;
async fn map_principal(
&self,
principal: Principal,
@@ -154,6 +159,7 @@ impl ManageDirectory for Store {
async fn get_principal_id(&self, name: &str) -> trc::Result<Option<u32>> {
self.get_principal_info(name).await.map(|v| v.map(|v| v.id))
}
async fn get_principal_info(&self, name: &str) -> trc::Result<Option<PrincipalInfo>> {
self.get_value::<PrincipalInfo>(ValueKey::from(ValueClass::Directory(
DirectoryClass::NameToId(name.as_bytes().to_vec()),
@@ -207,8 +213,7 @@ impl ManageDirectory for Store {
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.assert_value(name_key.clone(), ())
.with_document(principal_id)
.tag(Field::DOCUMENT_ID);
.with_document(principal_id);
build_search_index(&mut batch, principal_id, None, Some(&principal));
principal.sort();
batch
@@ -597,7 +602,6 @@ impl ManageDirectory for Store {
.with_account_id(u32::MAX)
.with_collection(Collection::Principal)
.with_document(principal_id)
.tag(Field::DOCUMENT_ID)
.assert_value(
ValueClass::Directory(DirectoryClass::NameToId(
create_principal.name().as_bytes().to_vec(),
@@ -853,7 +857,6 @@ impl ManageDirectory for Store {
// Delete principal
batch
.with_document(principal_id)
.untag(Field::DOCUMENT_ID)
.clear(DirectoryClass::NameToId(principal.name.as_bytes().to_vec()))
.clear(DirectoryClass::Principal(principal_id))
.clear(DirectoryClass::UsedQuota(principal_id));
@@ -2224,6 +2227,34 @@ impl ManageDirectory for Store {
.map(|_| count)
}
async fn principal_ids(
&self,
typ: Option<Type>,
tenant_id: Option<u32>,
) -> trc::Result<RoaringBitmap> {
let mut results = RoaringBitmap::new();
self.iterate(
IterateParams::new(
ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![0u8]))),
ValueKey::from(ValueClass::Directory(DirectoryClass::NameToId(vec![
u8::MAX;
10
]))),
),
|_, value| {
let pt = PrincipalInfo::deserialize(value).caused_by(trc::location!())?;
if typ.is_none_or(|t| pt.typ == t) && pt.has_tenant_access(tenant_id) {
results.insert(pt.id);
}
Ok(true)
},
)
.await
.caused_by(trc::location!())
.map(|_| results)
}
async fn get_member_of(&self, principal_id: u32) -> trc::Result<Vec<MemberOf>> {
let from_key = ValueKey::from(ValueClass::Directory(DirectoryClass::MemberOf {
principal_id,

View File

@@ -171,7 +171,11 @@ impl MailboxDestroy for Server {
.await
.and_then(|ids| ids.last_change_id(account_id))
{
Ok(change_id) => Ok(Ok(Some(change_id))),
Ok(change_id) => {
self.notify_task_queue();
Ok(Ok(Some(change_id)))
}
Err(err) if err.is_assertion_failure() => {
Ok(Err(MailboxDestroyError::AssertionFailed))
}

View File

@@ -27,7 +27,7 @@ use types::{
field::EmailField,
keyword::Keyword,
};
use utils::cheeky_hash::{CheekyHash, CheekyHashMap};
use utils::cheeky_hash::CheekyHash;
pub enum CopyMessageError {
NotFound,
@@ -102,21 +102,21 @@ impl EmailCopy for Server {
}
// Obtain threadId
let mut message_ids = CheekyHashMap::default();
let mut message_ids = Vec::new();
let mut subject = "";
for header in &metadata.contents[0].parts[0].headers {
match &header.name {
HeaderName::MessageId => {
header.value.visit_text(|id| {
if !id.is_empty() {
message_ids.insert(CheekyHash::new(id.as_bytes()), true);
message_ids.push(CheekyHash::new(id.as_bytes()));
}
});
}
HeaderName::InReplyTo | HeaderName::References | HeaderName::ResentMessageId => {
header.value.visit_text(|id| {
if !id.is_empty() {
message_ids.insert(CheekyHash::new(id.as_bytes()), false);
message_ids.push(CheekyHash::new(id.as_bytes()));
}
});
}
@@ -232,7 +232,7 @@ impl EmailCopy for Server {
.caused_by(trc::location!())?
.last_change_id(account_id)?;
// Request FTS index
// Request indexing
self.notify_task_queue();
// Update response

View File

@@ -7,6 +7,7 @@
use super::metadata::MessageData;
use crate::{cache::MessageCacheFetch, mailbox::*};
use common::{KV_LOCK_PURGE_ACCOUNT, Server, storage::index::ObjectIndexBuilder};
use directory::backend::internal::manage::ManageDirectory;
use groupware::calendar::storage::ItipAutoExpunge;
use std::future::Future;
use store::rand::prelude::SliceRandom;
@@ -19,7 +20,7 @@ use store::{
};
use trc::AddContext;
use types::collection::{Collection, VanishedCollection};
use types::field::{EmailField, EmailSubmissionField, Field};
use types::field::{EmailField, EmailSubmissionField};
pub trait EmailDeletion: Sync + Send {
fn emails_delete(
@@ -105,10 +106,7 @@ impl EmailDeletion for Server {
}
async fn purge_accounts(&self, use_roles: bool) {
if let Ok(account_ids) = self
.document_ids(u32::MAX, Collection::Principal, Field::DOCUMENT_ID)
.await
{
if let Ok(account_ids) = self.store().principal_ids(None, None).await {
let mut account_ids: Vec<u32> = account_ids
.into_iter()
.filter(|id| {
@@ -285,6 +283,7 @@ impl EmailDeletion for Server {
self.emails_delete(account_id, &mut batch, destroy_ids)
.await?;
self.commit_batch(batch).await?;
self.notify_task_queue();
Ok(())
}

View File

@@ -12,13 +12,18 @@ use crate::message::{
},
};
use common::storage::index::ObjectIndexBuilder;
use mail_parser::{decoders::html::html_to_text, parsers::preview::preview_text};
use mail_parser::{
ArchivedHeaderName,
decoders::html::html_to_text,
parsers::{fields::thread::thread_name, preview::preview_text},
};
use store::{
Serialize, SerializeInfallible,
write::{Archiver, BatchBuilder, BlobOp, DirectoryClass, IndexPropertyClass, ValueClass},
};
use trc::AddContext;
use types::{blob_hash::BlobHash, field::EmailField};
use utils::cheeky_hash::CheekyHash;
impl MessageMetadata {
#[inline(always)]
@@ -93,11 +98,34 @@ impl ArchivedMessageMetadata {
pub fn unindex(&self, batch: &mut BatchBuilder, account_id: u32, tenant_id: Option<u32>) {
// Delete metadata
let thread_name = self
.contents
.first()
.and_then(|c| c.parts.first())
.and_then(|p| {
p.headers.iter().rev().find_map(|h| {
if let ArchivedHeaderName::Subject = &h.name {
h.value.as_text()
} else {
None
}
})
})
.map(thread_name)
.unwrap_or_default();
batch
.clear(EmailField::Metadata)
.clear(ValueClass::IndexProperty(IndexPropertyClass::Integer {
property: EmailField::ReceivedToSize.into(),
value: self.received_at.to_native(),
}))
.clear(ValueClass::IndexProperty(IndexPropertyClass::Hash {
property: EmailField::Threading.into(),
hash: CheekyHash::new(if !thread_name.is_empty() {
thread_name
} else {
"!"
}),
}));
// Index properties

View File

@@ -12,12 +12,16 @@ use mail_parser::{
ArchivedHeaderName, ArchivedHeaderValue, DateTime, core::rkyv::ArchivedGetHeader,
decoders::html::html_to_text, parsers::fields::thread::thread_name,
};
use nlp::language::{
Language,
detect::{LanguageDetector, MIN_LANGUAGE_SCORE},
use nlp::{
language::{
Language,
detect::{LanguageDetector, MIN_LANGUAGE_SCORE},
},
tokenizers::word::WordTokenizer,
};
use store::{
ahash::AHashSet,
backend::MAX_TOKEN_LENGTH,
search::{EmailSearchField, IndexDocument, SearchField},
write::SearchIndex,
};
@@ -25,13 +29,18 @@ use store::{
impl ArchivedMessageMetadata {
pub fn index_document(
&self,
account_id: u32,
document_id: u32,
raw_message: &[u8],
index_fields: &AHashSet<SearchField>,
default_language: Language,
) -> IndexDocument {
let mut detector = LanguageDetector::new();
let mut language = Language::Unknown;
let message_contents = &self.contents[0];
let mut document = IndexDocument::new(SearchIndex::Email);
let mut document = IndexDocument::new(SearchIndex::Email)
.with_account_id(account_id)
.with_document_id(document_id);
if index_fields.is_empty()
|| index_fields.contains(&SearchField::Email(EmailSearchField::ReceivedAt))
@@ -147,12 +156,54 @@ impl ArchivedMessageMetadata {
}
}
_ => {
if index_fields.contains(&SearchField::Email(EmailSearchField::Headers))
{
#[cfg(not(feature = "test_mode"))]
let index_headers = index_fields
.contains(&SearchField::Email(EmailSearchField::Headers));
#[cfg(feature = "test_mode")]
let index_headers = true;
if index_headers {
let mut value = String::new();
header.value.visit_text(|text| {
value.push_str(text);
});
match &header.value {
ArchivedHeaderValue::Address(_) => {
header.value.visit_addresses(|_, addr| {
if !value.is_empty() {
value.push(' ');
}
value.push_str(addr);
});
}
ArchivedHeaderValue::Text(_)
| ArchivedHeaderValue::TextList(_) => {
header.value.visit_text(|text| {
if !value.is_empty() {
value.push(' ');
}
value.push_str(text);
});
}
ArchivedHeaderValue::ContentType(_)
| ArchivedHeaderValue::Received(_) => {
if let Some(header) = raw_message
.get(
header.offset_start.to_native() as usize
..header.offset_end.to_native() as usize,
)
.and_then(|bytes| std::str::from_utf8(bytes).ok())
{
for word in WordTokenizer::new(header, MAX_TOKEN_LENGTH)
{
if !value.is_empty() {
value.push(' ');
}
value.push_str(word.word.as_ref());
}
}
}
ArchivedHeaderValue::DateTime(_)
| ArchivedHeaderValue::Empty => (),
}
document.insert_key_value(
EmailSearchField::Headers,
@@ -267,15 +318,20 @@ impl ArchivedMessageMetadata {
}
}
if let Some(detected_language) = detector.most_frequent_language() {
document.set_unknown_language(detected_language);
}
#[cfg(not(feature = "test_mode"))]
document.set_unknown_language(
detector
.most_frequent_language()
.unwrap_or(default_language),
);
#[cfg(feature = "test_mode")]
document.set_unknown_language(default_language);
let has_attachment =
document.has_field(&(SearchField::Email(EmailSearchField::Attachment)));
document.index_bool(EmailSearchField::HasAttachment, has_attachment);
document
}
}

View File

@@ -44,10 +44,7 @@ use types::{
field::{ContactField, EmailField, MailboxField, PrincipalField},
keyword::Keyword,
};
use utils::{
cheeky_hash::{CheekyHash, CheekyHashMap},
sanitize_email,
};
use utils::{cheeky_hash::CheekyHash, sanitize_email};
#[derive(Default)]
pub struct IngestedEmail {
@@ -92,7 +89,7 @@ pub trait EmailIngest: Sync + Send {
&self,
account_id: u32,
thread_name: &str,
message_ids: &CheekyHashMap<bool>,
message_ids: &[CheekyHash],
) -> impl Future<Output = trc::Result<ThreadResult>> + Send;
fn assign_imap_uid(
&self,
@@ -207,7 +204,7 @@ impl EmailIngest for Server {
.and_then(sanitize_email)
&& sender != deliver_to
&& is_sender_authenticated
&& !self
&& self
.document_exists(
account_id,
Collection::ContactCard,
@@ -395,7 +392,7 @@ impl EmailIngest for Server {
// Obtain message references and thread name
let mut message_id = None;
let mut message_ids = CheekyHashMap::default();
let mut message_ids = Vec::new();
let thread_result = {
let mut subject = "";
for header in message.root_part().headers().iter().rev() {
@@ -405,7 +402,7 @@ impl EmailIngest for Server {
if message_id.is_none() {
message_id = id.to_string().into();
}
message_ids.insert(CheekyHash::new(id.as_bytes()), true);
message_ids.push(CheekyHash::new(id.as_bytes()));
}
}),
HeaderName::InReplyTo
@@ -413,7 +410,7 @@ impl EmailIngest for Server {
| HeaderName::ResentMessageId => {
header.value.visit_text(|id| {
if !id.is_empty() {
message_ids.insert(CheekyHash::new(id.as_bytes()), false);
message_ids.push(CheekyHash::new(id.as_bytes()));
}
});
}
@@ -430,6 +427,9 @@ impl EmailIngest for Server {
}
}
message_ids.sort_unstable();
message_ids.dedup();
self.find_thread_id(account_id, subject, &message_ids)
.await?
};
@@ -623,7 +623,6 @@ impl EmailIngest for Server {
.log_container_insert(SyncCollection::Thread);
document_id
};
let due = now();
batch
@@ -736,7 +735,7 @@ impl EmailIngest for Server {
&self,
account_id: u32,
thread_name: &str,
message_ids: &CheekyHashMap<bool>,
message_ids: &[CheekyHash],
) -> trc::Result<ThreadResult> {
let mut result = ThreadResult {
thread_id: None,
@@ -783,25 +782,23 @@ impl EmailIngest for Server {
|key, value| {
if key.len() == key_len {
// Find matching references
let mut from_offset = U32_LEN;
let references = value.get(U32_LEN..).unwrap_or_default();
while let Some(ref_hash) =
value.get(from_offset..).and_then(CheekyHash::deserialize)
{
if let Some(is_message_id) = message_ids.get(&ref_hash) {
let document_id = key.deserialize_be_u32(document_id_pos)?;
let thread_id = value.deserialize_be_u32(0)?;
if has_message_id(message_ids, references) {
let document_id = key.deserialize_be_u32(document_id_pos)?;
let thread_id = value.deserialize_be_u32(0)?;
if *is_message_id && from_offset == U32_LEN {
result.duplicate_ids.push(document_id);
}
thread_merge.add(thread_id, document_id);
return Ok(true);
if message_ids.len() == 1
|| (message_ids.len() == references.len() / CheekyHash::HASH_SIZE
&& references
.chunks_exact(CheekyHash::HASH_SIZE)
.zip(message_ids.iter())
.all(|(a, b)| a == b.as_raw_bytes()))
{
result.duplicate_ids.push(document_id);
}
from_offset += ref_hash.len();
thread_merge.add(thread_id, document_id);
}
}
@@ -851,6 +848,28 @@ impl EmailIngest for Server {
}
}
fn has_message_id(a: &[CheekyHash], b: &[u8]) -> bool {
let mut i = 0;
let mut j = 0;
let a_len = a.len();
let b_len = b.len() / CheekyHash::HASH_SIZE;
while i < a_len && j < b_len {
match a[i]
.as_raw_bytes()
.as_slice()
.cmp(&b[j * CheekyHash::HASH_SIZE..(j + 1) * CheekyHash::HASH_SIZE])
{
std::cmp::Ordering::Equal => return true,
std::cmp::Ordering::Less => i += 1,
std::cmp::Ordering::Greater => j += 1,
}
}
false
}
impl IngestSource<'_> {
pub fn is_smtp(&self) -> bool {
matches!(self, Self::Smtp { .. })
@@ -887,15 +906,13 @@ impl MergeThreadIds<Vec<u32>> {
}
impl MergeThreadIds<AHashSet<u32>> {
pub fn deserialize(document_id: u32, bytes: &[u8]) -> Option<Self> {
pub fn deserialize(bytes: &[u8]) -> Option<Self> {
if !bytes.is_empty() {
let thread_hash = CheekyHash::deserialize(bytes)?;
let mut merge_ids =
AHashSet::with_capacity(((bytes.len() - thread_hash.len()) / U32_LEN) + 1);
let mut start_offset = thread_hash.len();
merge_ids.insert(document_id);
while let Some(id_bytes) = bytes.get(start_offset..start_offset + U32_LEN) {
merge_ids.insert(u32::from_be_bytes(id_bytes.try_into().ok()?));
start_offset += U32_LEN;
@@ -921,20 +938,11 @@ impl std::hash::Hash for MergeThreadIds<AHashSet<u32>> {
pub(crate) struct ThreadInfo;
impl ThreadInfo {
pub fn serialize(thread_id: u32, ref_ids: &CheekyHashMap<bool>) -> Vec<u8> {
let mut buf = Vec::with_capacity(U32_LEN + ref_ids.len() * (1 + 16));
pub fn serialize(thread_id: u32, ref_ids: &[CheekyHash]) -> Vec<u8> {
let mut buf = Vec::with_capacity(U32_LEN + 1 + ref_ids.len() * CheekyHash::HASH_SIZE);
buf.extend_from_slice(&thread_id.to_be_bytes());
for (ref_id, is_message_id) in ref_ids {
if *is_message_id && buf.len() > U32_LEN {
// Place Message-id reference first
let mut new_buf = Vec::with_capacity(U32_LEN + ref_ids.len() * (1 + 16));
new_buf.extend_from_slice(&thread_id.to_be_bytes());
new_buf.extend_from_slice(ref_id.as_bytes());
new_buf.extend_from_slice(&buf[U32_LEN..]);
buf = new_buf;
} else {
buf.extend_from_slice(ref_id.as_bytes());
}
for ref_id in ref_ids {
buf.extend_from_slice(ref_id.as_raw_bytes());
}
buf
}
@@ -942,7 +950,6 @@ impl ThreadInfo {
pub struct ThreadMerge {
entries: AHashMap<u32, Vec<u32>>,
num_ids: usize,
}
pub struct ThreadMergeResult {
@@ -955,17 +962,11 @@ impl ThreadMerge {
pub fn new() -> Self {
Self {
entries: AHashMap::with_capacity(8),
num_ids: 0,
}
}
pub fn add(&mut self, thread_id: u32, document_id: u32) {
self.entries.entry(thread_id).or_default().push(document_id);
self.num_ids += 1;
}
pub fn num_document_ids(&self) -> usize {
self.num_ids
}
pub fn num_thread_ids(&self) -> usize {
@@ -1005,7 +1006,7 @@ impl ThreadMerge {
pub fn merge(self) -> ThreadMergeResult {
let mut max_thread_id = u32::MAX;
let mut max_count = 0;
let mut merge_ids = Vec::with_capacity(self.num_ids);
let mut merge_ids = Vec::with_capacity(self.entries.len());
for (thread_id, ids) in self.entries {
match ids.len().cmp(&max_count) {
@@ -1020,7 +1021,7 @@ impl ThreadMerge {
}
Ordering::Less => (),
}
merge_ids.extend(ids);
merge_ids.push(thread_id);
}
ThreadMergeResult {

View File

@@ -110,7 +110,7 @@ pub(super) async fn build_calcard_resources(
.caused_by(trc::location!())?;
if cache.paths.is_empty() {
if !is_first_check {
if is_first_check {
if is_calendar {
server
.create_default_calendar(access_token, account_id, &name)

View File

@@ -27,7 +27,11 @@ use store::{
write::{IndexPropertyClass, SearchIndex, ValueClass},
xxhash_rust::xxh3,
};
use types::{acl::AclGrant, collection::SyncCollection, field::CalendarNotificationField};
use types::{
acl::AclGrant,
collection::SyncCollection,
field::{CalendarEventField, CalendarNotificationField},
};
impl IndexableObject for Calendar {
fn index_values(&self) -> impl Iterator<Item = IndexValue<'_>> {
@@ -88,6 +92,10 @@ impl IndexableObject for CalendarEvent {
.chain([self.data.event_range_start() as u64])
.fold(0, |acc, hash| acc ^ hash),
},
IndexValue::Index {
field: CalendarEventField::Uid.into(),
value: self.data.event.uids().next().into(),
},
IndexValue::Quota {
used: self.dead_properties.size() as u32
+ self.display_name.as_ref().map_or(0, |n| n.len() as u32)
@@ -114,6 +122,10 @@ impl IndexableObject for &ArchivedCalendarEvent {
.chain([self.data.event_range_start() as u64])
.fold(0, |acc, hash| acc ^ hash),
},
IndexValue::Index {
field: CalendarEventField::Uid.into(),
value: self.data.event.uids().next().into(),
},
IndexValue::Quota {
used: self.dead_properties.size() as u32
+ self.display_name.as_ref().map_or(0, |n| n.len() as u32)
@@ -340,8 +352,16 @@ impl ArchivedCalendarEvent {
}
impl ArchivedCalendarEvent {
pub fn index_document(&self, index_fields: &AHashSet<SearchField>) -> IndexDocument {
let mut document = IndexDocument::new(SearchIndex::Calendar);
pub fn index_document(
&self,
account_id: u32,
document_id: u32,
index_fields: &AHashSet<SearchField>,
default_language: Language,
) -> IndexDocument {
let mut document = IndexDocument::new(SearchIndex::Calendar)
.with_account_id(account_id)
.with_document_id(document_id);
if index_fields.is_empty()
|| index_fields.contains(&SearchField::Calendar(CalendarSearchField::Start))
@@ -400,9 +420,11 @@ impl ArchivedCalendarEvent {
}
}
if let Some(detected_language) = detector.most_frequent_language() {
document.set_unknown_language(detected_language);
}
document.set_unknown_language(
detector
.most_frequent_language()
.unwrap_or(default_language),
);
document
}

View File

@@ -260,8 +260,16 @@ impl ArchivedContactCard {
}
impl ArchivedContactCard {
pub fn index_document(&self, index_fields: &AHashSet<SearchField>) -> IndexDocument {
let mut document = IndexDocument::new(SearchIndex::Contacts);
pub fn index_document(
&self,
account_id: u32,
document_id: u32,
index_fields: &AHashSet<SearchField>,
default_language: Language,
) -> IndexDocument {
let mut document = IndexDocument::new(SearchIndex::Contacts)
.with_account_id(account_id)
.with_document_id(document_id);
let mut detector = LanguageDetector::new();
for entry in self.card.entries.iter() {
@@ -322,9 +330,11 @@ impl ArchivedContactCard {
}
}
if let Some(detected_language) = detector.most_frequent_language() {
document.set_unknown_language(detected_language);
}
document.set_unknown_language(
detector
.most_frequent_language()
.unwrap_or(default_language),
);
document
}

View File

@@ -158,7 +158,9 @@ impl<T: SessionStream> SessionData<T> {
&mut imap_ids,
&mut saved_results,
);
imap_ids.sort_unstable();
if !is_sort {
imap_ids.sort_unstable();
}
// Save results
if let (Some(results_tx), Some(saved_results)) = (results_tx, saved_results) {
@@ -564,7 +566,9 @@ impl<T: SessionStream> SessionData<T> {
SearchComparator::field(EmailSearchField::ReceivedAt, comparator.ascending)
}
search::Sort::Cc => {
SearchComparator::field(EmailSearchField::Cc, comparator.ascending)
return Err(trc::ImapEvent::Error
.into_err()
.details("Sorting by CC is not supported."));
}
search::Sort::Date => {
SearchComparator::field(EmailSearchField::SentAt, comparator.ascending)

View File

@@ -353,6 +353,7 @@ impl AddressBookSet for Server {
.caused_by(trc::location!())?
.last_change_id(account_id)
{
self.notify_task_queue();
response.new_state = State::Exact(change_id).into();
}

View File

@@ -5,6 +5,7 @@
*/
use common::{Server, auth::AccessToken, sharing::EffectiveAcl};
use directory::backend::internal::manage::ManageDirectory;
use jmap_proto::{
error::set::SetError,
object::{JmapRight, JmapSharedObject},
@@ -12,8 +13,6 @@ use jmap_proto::{
use jmap_tools::{JsonPointerIter, Key, Map, Property, Value};
use types::{
acl::{Acl, AclGrant},
collection::Collection,
field::Field,
id::Id,
};
use utils::map::bitmap::Bitmap;
@@ -241,7 +240,8 @@ impl JmapAcl for Server {
}
let principal_ids = self
.document_ids(u32::MAX, Collection::Principal, Field::DOCUMENT_ID)
.store()
.principal_ids(None, None)
.await
.unwrap_or_default();

View File

@@ -345,6 +345,7 @@ impl CalendarSet for Server {
.caused_by(trc::location!())?
.last_change_id(account_id)
{
self.notify_task_queue();
response.new_state = State::Exact(change_id).into();
}

View File

@@ -96,7 +96,6 @@ impl JmapCalendarEventCopy for Server {
// Obtain quota
let mut batch = BatchBuilder::new();
let mut nudge_queue = false;
'create: for (id, create) in request.create.into_valid() {
let from_calendar_event_id = id.document_id();
@@ -158,9 +157,8 @@ impl JmapCalendarEventCopy for Server {
)
.await?
{
Ok(result) => {
response.created(id, result.document_id);
nudge_queue |= result.nudge_queue;
Ok(document_id) => {
response.created(id, document_id);
// Add to destroy list
if on_success_delete {
@@ -181,10 +179,7 @@ impl JmapCalendarEventCopy for Server {
.await
.and_then(|ids| ids.last_change_id(account_id))
.caused_by(trc::location!())?;
if nudge_queue {
self.notify_task_queue();
}
self.notify_task_queue();
response.new_state = State::Exact(change_id);
}

View File

@@ -69,12 +69,7 @@ pub trait CalendarEventSet: Sync + Send {
can_add_calendars: &Option<RoaringBitmap>,
js_calendar_event: JSCalendar<'_, Id, BlobId>,
updates: Value<'_, JSCalendarProperty<Id>, JSCalendarValue<Id, BlobId>>,
) -> impl Future<Output = trc::Result<Result<CalendarCreateResult, SetError<JSCalendarProperty<Id>>>>>;
}
pub struct CalendarCreateResult {
pub document_id: u32,
pub nudge_queue: bool,
) -> impl Future<Output = trc::Result<Result<u32, SetError<JSCalendarProperty<Id>>>>>;
}
impl CalendarEventSet for Server {
@@ -112,7 +107,6 @@ impl CalendarEventSet for Server {
// Process creates
let mut batch = BatchBuilder::new();
let send_scheduling_messages = request.arguments.send_scheduling_messages.unwrap_or(false);
let mut nudge_queue = false;
'create: for (id, object) in request.unwrap_create() {
match self
.create_calendar_event(
@@ -127,9 +121,8 @@ impl CalendarEventSet for Server {
)
.await?
{
Ok(result) => {
response.created(id, result.document_id);
nudge_queue |= result.nudge_queue;
Ok(document_id) => {
response.created(id, document_id);
}
Err(err) => {
response.not_created.append(id, err);
@@ -377,7 +370,6 @@ impl CalendarEventSet for Server {
}
}
}
nudge_queue |= next_email_alarm.is_some() || itip_messages.is_some();
// Validate quota
let extra_bytes = (new_calendar_event.size as u64)
@@ -484,8 +476,6 @@ impl CalendarEventSet for Server {
)
.caused_by(trc::location!())?;
nudge_queue |= send_scheduling_messages;
response.destroyed.push(id);
}
@@ -496,10 +486,7 @@ impl CalendarEventSet for Server {
.await
.and_then(|ids| ids.last_change_id(account_id))
.caused_by(trc::location!())?;
if nudge_queue {
self.notify_task_queue();
}
self.notify_task_queue();
response.new_state = State::Exact(change_id).into();
}
@@ -517,7 +504,7 @@ impl CalendarEventSet for Server {
can_add_calendars: &Option<RoaringBitmap>,
mut js_calendar_group: JSCalendar<'_, Id, BlobId>,
updates: Value<'_, JSCalendarProperty<Id>, JSCalendarValue<Id, BlobId>>,
) -> trc::Result<Result<CalendarCreateResult, SetError<JSCalendarProperty<Id>>>> {
) -> trc::Result<Result<u32, SetError<JSCalendarProperty<Id>>>> {
// Process changes
let mut event = CalendarEvent::default();
let use_default_alerts = match update_calendar_event(
@@ -643,7 +630,6 @@ impl CalendarEventSet for Server {
}
}
}
let nudge_queue = next_email_alarm.is_some() || itip_messages.is_some();
// Validate quota
match self
@@ -680,10 +666,7 @@ impl CalendarEventSet for Server {
itip_messages.queue(batch).caused_by(trc::location!())?;
}
Ok(Ok(CalendarCreateResult {
document_id,
nudge_queue,
}))
Ok(Ok(document_id))
}
}

View File

@@ -326,6 +326,8 @@ impl ContactCardSet for Server {
.and_then(|ids| ids.last_change_id(account_id))
.caused_by(trc::location!())?;
self.notify_task_queue();
response.new_state = State::Exact(change_id).into();
}

View File

@@ -1082,6 +1082,7 @@ impl EmailSet for Server {
.and_then(|ids| ids.last_change_id(account_id))
.caused_by(trc::location!())?
.into();
self.notify_task_queue();
}
// Mark messages that were not found as not destroyed (this should not occur in practice)

View File

@@ -22,7 +22,7 @@ use store::{
use trc::AddContext;
use types::{
collection::{Collection, SyncCollection},
field::Field,
field::IdentityField,
};
use utils::sanitize_email;
@@ -145,7 +145,7 @@ impl IdentityGet for Server {
async fn identity_get_or_create(&self, account_id: u32) -> trc::Result<RoaringBitmap> {
let mut identity_ids = self
.document_ids(account_id, Collection::Identity, Field::DOCUMENT_ID)
.document_ids(account_id, Collection::Identity, IdentityField::DocumentId)
.await?;
if !identity_ids.is_empty() {
return Ok(identity_ids);
@@ -206,7 +206,7 @@ impl IdentityGet for Server {
next_document_id -= 1;
batch
.with_document(document_id)
.tag(Field::DOCUMENT_ID)
.tag(IdentityField::DocumentId)
.custom(ObjectIndexBuilder::<(), _>::new().with_changes(Identity {
name,
email,

View File

@@ -21,7 +21,7 @@ use store::write::BatchBuilder;
use trc::AddContext;
use types::{
collection::{Collection, SyncCollection},
field::Field,
field::{Field, IdentityField},
};
use utils::sanitize_email;
@@ -41,7 +41,7 @@ impl IdentitySet for Server {
) -> trc::Result<SetResponse<identity::Identity>> {
let account_id = request.account_id.document_id();
let identity_ids = self
.document_ids(account_id, Collection::Identity, Field::DOCUMENT_ID)
.document_ids(account_id, Collection::Identity, IdentityField::DocumentId)
.await?;
let mut response = SetResponse::from_request(&request, self.core.jmap.set_max_objects)?;
let will_destroy = request.unwrap_destroy().into_valid().collect::<Vec<_>>();
@@ -111,7 +111,7 @@ impl IdentitySet for Server {
.with_account_id(account_id)
.with_collection(Collection::Identity)
.with_document(document_id)
.tag(Field::DOCUMENT_ID)
.tag(IdentityField::DocumentId)
.custom(ObjectIndexBuilder::<(), _>::new().with_changes(identity))
.caused_by(trc::location!())?
.commit_point();
@@ -177,7 +177,7 @@ impl IdentitySet for Server {
.with_account_id(account_id)
.with_collection(Collection::Identity)
.with_document(document_id)
.untag(Field::DOCUMENT_ID)
.untag(IdentityField::DocumentId)
.clear(Field::ARCHIVE)
.log_item_delete(SyncCollection::Identity, None)
.commit_point();

View File

@@ -11,10 +11,7 @@ use jmap_proto::{
method::query::{Comparator, Filter, QueryRequest, QueryResponse},
object::mailbox::{Mailbox, MailboxComparator, MailboxFilter},
};
use std::{
collections::{BTreeMap, BTreeSet},
future::Future,
};
use std::{collections::BTreeMap, future::Future};
use store::{
ahash::AHashMap,
roaring::RoaringBitmap,
@@ -166,7 +163,7 @@ impl MailboxQuery for Server {
sorted_set
.into_iter()
.enumerate()
.map(|(i, (_, v))| (i as u32, v))
.map(|(i, (_, v))| (v, i as u32))
.collect(),
true,
));
@@ -186,13 +183,13 @@ impl MailboxQuery for Server {
.items
.iter()
.map(|mailbox| (mailbox.name.as_str(), mailbox.document_id))
.collect::<BTreeSet<_>>();
.collect::<BTreeMap<_, _>>();
SearchComparator::sorted_set(
sorted_set
.into_iter()
.enumerate()
.map(|(i, (_, v))| (i as u32, v))
.map(|(i, (_, v))| (v, i as u32))
.collect(),
comparator.is_ascending,
)

View File

@@ -64,7 +64,7 @@ impl SieveScriptQuery for Server {
IndexKeyPrefix {
account_id,
collection: Collection::SieveScript.into(),
field: u8::from(Collection::SieveScript) + 1,
field: u8::from(SieveField::Name) + 1,
},
)
.no_values(),
@@ -101,15 +101,20 @@ impl SieveScriptQuery for Server {
)));
}
SieveFilter::IsActive(is_active) => {
let active_script_id = active_script_id.unwrap();
if is_active {
filters.push(SearchFilter::is_in_set(RoaringBitmap::from_iter([
active_script_id,
])));
if let Some(active_script_id) = active_script_id {
filters.push(SearchFilter::is_in_set(RoaringBitmap::from_iter([
active_script_id,
])));
} else {
// No active script, so no results
filters.push(SearchFilter::is_in_set(RoaringBitmap::new()));
}
} else {
let mut inactive_set = document_ids.clone();
inactive_set.remove(active_script_id);
if let Some(active_script_id) = active_script_id {
inactive_set.remove(active_script_id);
}
filters.push(SearchFilter::is_in_set(inactive_set));
}
}

View File

@@ -57,7 +57,7 @@ impl EmailSubmissionQuery for Server {
IterateParams::new(
ValueKey {
account_id,
collection: Collection::CalendarEventNotification.into(),
collection: Collection::EmailSubmission.into(),
document_id: 0,
class: ValueClass::IndexProperty(IndexPropertyClass::Integer {
property: EmailSubmissionField::Metadata.into(),
@@ -66,7 +66,7 @@ impl EmailSubmissionQuery for Server {
},
ValueKey {
account_id,
collection: Collection::CalendarEventNotification.into(),
collection: Collection::EmailSubmission.into(),
document_id: 0,
class: ValueClass::IndexProperty(IndexPropertyClass::Integer {
property: EmailSubmissionField::Metadata.into(),

View File

@@ -14,9 +14,18 @@ use jmap_proto::{
};
use jmap_tools::Map;
use std::future::Future;
use store::{ahash::AHashMap, roaring::RoaringBitmap};
use store::{
IterateParams, U32_LEN, ValueKey,
ahash::AHashMap,
roaring::RoaringBitmap,
write::{IndexPropertyClass, ValueClass, key::DeserializeBigEndian},
};
use trc::AddContext;
use types::{collection::SyncCollection, id::Id};
use types::{
collection::{Collection, SyncCollection},
field::EmailField,
id::Id,
};
pub trait ThreadGet: Sync + Send {
fn thread_get(
@@ -32,6 +41,7 @@ impl ThreadGet for Server {
) -> trc::Result<GetResponse<Thread>> {
let account_id = request.account_id.document_id();
let mut thread_map: AHashMap<u32, RoaringBitmap> = AHashMap::with_capacity(32);
let mut all_ids = RoaringBitmap::new();
for item in &self
.get_cached_messages(account_id)
.await
@@ -43,6 +53,7 @@ impl ThreadGet for Server {
.entry(item.thread_id)
.or_default()
.insert(item.document_id);
all_ids.insert(item.document_id);
}
let ids = if let Some(ids) = request.unwrap_ids(self.core.jmap.get_max_objects)? {
@@ -69,16 +80,60 @@ impl ThreadGet for Server {
not_found: vec![],
};
let ordered_ids = if add_email_ids && !all_ids.is_empty() {
let mut ordered_id = Vec::with_capacity(all_ids.len() as usize);
self.store()
.iterate(
IterateParams::new(
ValueKey {
account_id,
collection: Collection::Email.into(),
document_id: 0,
class: ValueClass::IndexProperty(IndexPropertyClass::Integer {
property: EmailField::ReceivedToSize.into(),
value: 0,
}),
},
ValueKey {
account_id,
collection: Collection::Email.into(),
document_id: u32::MAX,
class: ValueClass::IndexProperty(IndexPropertyClass::Integer {
property: EmailField::ReceivedToSize.into(),
value: u64::MAX,
}),
},
)
.ascending()
.no_values(),
|key, _| {
let document_id = key.deserialize_be_u32(key.len() - U32_LEN)?;
if all_ids.contains(document_id) {
ordered_id.push(document_id);
}
Ok(true)
},
)
.await
.caused_by(trc::location!())?;
Some(ordered_id)
} else {
None
};
for id in ids {
let thread_id = id.document_id();
if let Some(document_ids) = thread_map.remove(&thread_id) {
let mut thread: Map<'_, ThreadProperty, ThreadValue> =
Map::with_capacity(2).with_key_value(ThreadProperty::Id, id);
if add_email_ids {
if let Some(ordered_ids) = &ordered_ids {
thread.insert_unchecked(
ThreadProperty::EmailIds,
document_ids
.into_iter()
ordered_ids
.iter()
.filter(|id| document_ids.contains(**id))
.copied()
.map(|id| Id::from_parts(thread_id, id))
.collect::<Vec<_>>(),
);

View File

@@ -4,12 +4,11 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::Language;
use ahash::AHashMap;
use whatlang::{Lang, detect};
use super::Language;
pub const MIN_LANGUAGE_SCORE: f64 = 0.5;
pub const MIN_LANGUAGE_SCORE: f64 = 0.6;
#[derive(Debug)]
struct WeightedAverage {
@@ -62,6 +61,7 @@ impl LanguageDetector {
pub fn most_frequent_language(&self) -> Option<Language> {
self.lang_detected
.iter()
.filter(|(l, _)| !matches!(l, Language::None))
.max_by(|(_, a), (_, b)| {
((a.confidence / a.weight as f64) * a.occurrences as f64)
.partial_cmp(&((b.confidence / b.weight as f64) * b.occurrences as f64))

View File

@@ -56,9 +56,7 @@ impl<T: SessionStream> Session<T> {
// Obtain message sizes
let mut message_sizes = AHashMap::new();
self.server
.core
.storage
.data
.store()
.iterate(
IterateParams::new(
ValueKey {

View File

@@ -98,6 +98,7 @@ impl<T: SessionStream> Session<T> {
.commit_batch(batch)
.await
.caused_by(trc::location!())?;
self.server.notify_task_queue();
}
if not_deleted.is_empty() {
self.write_ok(format!(

View File

@@ -176,13 +176,13 @@ impl SearchIndexTask for Server {
}
// Commit deletion batch to data store
if !batch.is_empty() {
if let Err(err) = self.store().write(batch.build_all()).await {
trc::error!(
err.caused_by(trc::location!())
.details("Failed to commit index deletions to data store")
);
}
if !batch.is_empty()
&& let Err(err) = self.store().write(batch.build_all()).await
{
trc::error!(
err.caused_by(trc::location!())
.details("Failed to commit index deletions to data store")
);
for r in results.iter_mut() {
if r.task_type == TaskType::Delete
&& r.status == TaskStatus::Success
@@ -461,7 +461,13 @@ async fn build_email_document(
.details("Blob not found")
})?;
Ok(Some(metadata.index_document(&raw_message, index_fields)))
Ok(Some(metadata.index_document(
account_id,
document_id,
&raw_message,
index_fields,
server.core.jmap.default_language,
)))
}
None => Ok(None),
}
@@ -484,7 +490,12 @@ async fn build_calendar_document(
metadata_
.unarchive::<CalendarEvent>()
.caused_by(trc::location!())?
.index_document(index_fields),
.index_document(
account_id,
document_id,
index_fields,
server.core.jmap.default_language,
),
)),
None => Ok(None),
}
@@ -507,7 +518,12 @@ async fn build_contact_document(
metadata_
.unarchive::<ContactCard>()
.caused_by(trc::location!())?
.index_document(index_fields),
.index_document(
account_id,
document_id,
index_fields,
server.core.jmap.default_language,
),
)),
None => Ok(None),
}

View File

@@ -313,11 +313,11 @@ impl Task<TaskAction> {
})
}
Some(4) => TaskAction::SendImip,
Some(9) => TaskAction::MergeThreads(
MergeThreadIds::deserialize(document_id, value).ok_or_else(|| {
trc::Error::corrupted_key(key, value.into(), trc::location!())
})?,
),
Some(9) => {
TaskAction::MergeThreads(MergeThreadIds::deserialize(value).ok_or_else(
|| trc::Error::corrupted_key(key, value.into(), trc::location!()),
)?)
}
_ => return Err(trc::Error::corrupted_key(key, None, trc::location!())),
},
})

View File

@@ -90,16 +90,12 @@ async fn merge_threads(
.ascending(),
|key, value| {
if key.len() == key_len {
let document_id = key.deserialize_be_u32(document_id_pos)?;
if merge_threads.merge_ids.contains(&document_id) {
let thread_id = value.deserialize_be_u32(0)?;
let thread_id = value.deserialize_be_u32(0)?;
if merge_threads.merge_ids.contains(&thread_id) {
let document_id = key.deserialize_be_u32(document_id_pos)?;
thread_merge.add(thread_id, document_id);
thread_index.insert(document_id, value.to_vec());
return Ok(
thread_merge.num_document_ids() != merge_threads.merge_ids.len()
);
}
}

View File

@@ -92,7 +92,7 @@ struct Locked {
}
pub fn spawn_task_manager(inner: Arc<Inner>) {
// Create three mpsc channels for the different task types
// Create mpsc channels for the different task types
let (tx_index_1, mut rx_index_1) = mpsc::channel::<Task<IndexAction>>(IPC_CHANNEL_BUFFER);
let (tx_index_2, mut rx_index_2) = mpsc::channel::<Task<bool>>(IPC_CHANNEL_BUFFER);
let (tx_index_3, mut rx_index_3) = mpsc::channel::<Task<CalendarAlarm>>(IPC_CHANNEL_BUFFER);
@@ -155,6 +155,12 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
if success.iter().all(|t| t.is_done()) {
delete_tasks(&server, &locked_batch).await;
} else {
trc::event!(
TaskQueue(TaskQueueEvent::TaskFailed),
Total = locked_batch.len(),
Details = "Indexing task failed",
);
// Remove successful entries from queue
let mut to_delete = Vec::with_capacity(locked_batch.len());
for (task, result) in locked_batch.into_iter().zip(success.into_iter()) {
@@ -162,7 +168,9 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
to_delete.push(task);
}
}
delete_tasks(&server, &to_delete).await;
if !to_delete.is_empty() {
delete_tasks(&server, &to_delete).await;
}
}
}
}
@@ -193,6 +201,13 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
// Remove entry from queue
if success {
delete_tasks(&server, &[task]).await;
} else {
trc::event!(
TaskQueue(TaskQueueEvent::TaskFailed),
AccountId = task.account_id,
DocumentId = task.document_id,
Details = "Bayes training task failed",
);
}
}
}
@@ -230,6 +245,13 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
// Remove entry from queue
if success {
delete_tasks(&server, &[task]).await;
} else {
trc::event!(
TaskQueue(TaskQueueEvent::TaskFailed),
AccountId = task.account_id,
DocumentId = task.document_id,
Details = "Sending alarm task failed",
);
}
}
}
@@ -267,6 +289,13 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
// Remove entry from queue
if success {
delete_tasks(&server, &[task]).await;
} else {
trc::event!(
TaskQueue(TaskQueueEvent::TaskFailed),
AccountId = task.account_id,
DocumentId = task.document_id,
Details = "Sending iMIP task failed",
);
}
}
}
@@ -295,6 +324,13 @@ pub fn spawn_task_manager(inner: Arc<Inner>) {
// Remove entry from queue
if success {
delete_tasks(&server, &[task]).await;
} else {
trc::event!(
TaskQueue(TaskQueueEvent::TaskFailed),
AccountId = task.account_id,
DocumentId = task.document_id,
Details = "Merging threads task failed",
);
}
}
}

View File

@@ -11,7 +11,8 @@ trc = { path = "../trc" }
rocksdb = { version = "0.24", optional = true, features = ["multi-threaded-cf"] }
foundationdb = { version = "0.9.2", features = ["embedded-fdb-include", "fdb-7_3"], optional = true }
rusqlite = { version = "0.37", features = ["bundled"], optional = true }
rust-s3 = { version = "0.37", default-features = false, features = ["tokio-rustls-tls"], optional = true }
#rust-s3 = { version = "0.37", default-features = false, features = ["tokio-rustls-tls"], optional = true }
rust-s3 = { version = "0.35", default-features = false, features = ["tokio-rustls-tls", "no-verify-ssl"], optional = true }
async-nats = { version = "0.44", default-features = false, features = ["server_2_10", "server_2_11", "ring"], optional = true }
azure_core = { version = "0.21.0", optional = true }
azure_storage = { version = "0.21.0", default-features = false, features = ["enable_reqwest_rustls", "hmac_rust"], optional = true }
@@ -52,6 +53,7 @@ rkyv = { version = "0.8.10", features = ["little_endian"] }
compact_str = "0.9.0"
zenoh = { version = "1.3.4", default-features = false, features = ["auth_pubkey", "transport_multilink", "transport_compression", "transport_quic", "transport_tcp", "transport_tls", "transport_udp"], optional = true }
rdkafka = { version = "0.38", features = ["cmake-build"], optional = true }
rustls_021 = { package = "rustls", version = "0.21", default-features = false, features = ["dangerous_configuration"], optional = true }
[dev-dependencies]
tokio = { version = "1.47", features = ["full"] }
@@ -60,13 +62,13 @@ tokio = { version = "1.47", features = ["full"] }
# Data Stores
rocks = ["rocksdb", "rayon", "num_cpus"]
sqlite = ["rusqlite", "rayon", "r2d2", "num_cpus", "lru-cache"]
postgres = ["tokio-postgres", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "rustls-pki-types", "futures", "bytes"]
postgres = ["tokio-postgres", "deadpool", "deadpool-postgres", "tokio-rustls", "rustls", "ring", "rustls-pki-types", "futures", "bytes"]
mysql = ["mysql_async", "futures"]
foundation = ["foundationdb", "futures"]
fdb-chunked-bm = []
# Blob stores
s3 = ["rust-s3"]
s3 = ["rust-s3", "rustls_021"]
azure = ["azure_core", "azure_storage", "azure_storage_blobs"]
# In-memory stores

View File

@@ -40,6 +40,9 @@ impl ElasticSearchStore {
.property_or_default((&prefix, "index.include-source"), "false")
.unwrap_or(false);
#[cfg(feature = "test_mode")]
es.drop_indexes().await.unwrap();
if let Err(err) = es
.create_index::<EmailSearchField>(shards, replicas, with_source)
.await
@@ -77,11 +80,17 @@ impl ElasticSearchStore {
replicas: usize,
with_source: bool,
) -> trc::Result<()> {
let mut mappings = T::primary_keys()
.iter()
.chain(T::all_fields())
.map(|field| (field.es_field().to_string(), field.es_schema()))
.collect::<serde_json::Map<String, Value>>();
let mut mappings = serde_json::Map::new();
mappings.insert(
"properties".to_string(),
Value::Object(
T::primary_keys()
.iter()
.chain(T::all_fields())
.map(|field| (field.es_field().to_string(), field.es_schema()))
.collect::<serde_json::Map<String, Value>>(),
),
);
if !with_source {
mappings.insert("_source".to_string(), json!({ "enabled": false }));
}
@@ -95,7 +104,7 @@ impl ElasticSearchStore {
"default": {
"type": "custom",
"tokenizer": "standard",
"filter": ["lowercase"]
"filter": ["lowercase", "stemmer"]
}
}
}
@@ -103,12 +112,6 @@ impl ElasticSearchStore {
});
let body = serde_json::to_string(&body).unwrap_or_default();
let c = println!(
"Creating Elasticsearch index {} with body: {}",
T::index().es_index_name(),
body
);
assert_success(
self.client
.put(format!("{}/{}", self.url, T::index().es_index_name()))
@@ -119,6 +122,27 @@ impl ElasticSearchStore {
.await
.map(|_| ())
}
#[cfg(feature = "test_mode")]
pub async fn drop_indexes(&self) -> trc::Result<()> {
for index in &[
SearchIndex::Email,
SearchIndex::Calendar,
SearchIndex::Contacts,
SearchIndex::Tracing,
] {
assert_success(
self.client
.delete(format!("{}/{}", self.url, index.es_index_name()))
.send()
.await,
)
.await
.map(|_| ())?;
}
Ok(())
}
}
pub(crate) async fn assert_success(response: Result<Response, Error>) -> trc::Result<Response> {
@@ -153,8 +177,8 @@ impl SearchIndex {
impl SearchField {
pub fn es_field(&self) -> &'static str {
match self {
SearchField::AccountId => "doc_id",
SearchField::DocumentId => "acc_id",
SearchField::AccountId => "acc_id",
SearchField::DocumentId => "doc_id",
SearchField::Id => "id",
SearchField::Email(field) => match field {
EmailSearchField::From => "from",
@@ -164,7 +188,7 @@ impl SearchField {
EmailSearchField::Subject => "subj",
EmailSearchField::Body => "body",
EmailSearchField::Attachment => "attach",
EmailSearchField::ReceivedAt => "received",
EmailSearchField::ReceivedAt => "rcvd",
EmailSearchField::SentAt => "sent",
EmailSearchField::Size => "size",
EmailSearchField::HasAttachment => "has_att",
@@ -242,12 +266,24 @@ impl SearchField {
"enabled": true
})
}
SearchField::Email(
EmailSearchField::Cc
| EmailSearchField::Bcc
| EmailSearchField::Body
| EmailSearchField::Attachment,
)
#[cfg(feature = "test_mode")]
SearchField::Email(EmailSearchField::Bcc | EmailSearchField::Cc) => {
json!({
"type": "text",
"fields": {
"keyword": {
"type": "keyword"
}
}
})
}
#[cfg(not(feature = "test_mode"))]
SearchField::Email(EmailSearchField::Bcc | EmailSearchField::Cc) => {
json!({
"type": "text"
})
}
SearchField::Email(EmailSearchField::Body | EmailSearchField::Attachment)
| SearchField::Calendar(
CalendarSearchField::Title
| CalendarSearchField::Description

View File

@@ -96,14 +96,11 @@ impl ElasticSearchStore {
.into_iter()
.flatten(),
);
let request = serde_json::to_string(&query).unwrap_or_default();
let c = println!("Elasticsearch query: {}", request);
let response = assert_success(
self.client
.post(format!("{}/{}/_search", self.url, index.es_index_name()))
.body(request)
.body(serde_json::to_string(&query).unwrap_or_default())
.send()
.await,
)
@@ -136,29 +133,10 @@ impl ElasticSearchStore {
.reason("Unindex operation requires at least one filter"));
}
#[cfg(feature = "test_mode")]
{
assert_success(
self.client
.get(format!(
"{}/{}/_refresh",
self.url,
filter.index.es_index_name()
))
.send()
.await,
)
.await?;
}
let query = json!({
"query": build_query(&filter.filters),
});
let request = serde_json::to_string(&query).unwrap_or_default();
let c = println!("Elasticsearch unindex query: {}", request);
let response = assert_success(
self.client
.post(format!(
@@ -166,7 +144,7 @@ impl ElasticSearchStore {
self.url,
filter.index.es_index_name()
))
.body(request)
.body(serde_json::to_string(&query).unwrap_or_default())
.send()
.await,
)
@@ -181,6 +159,14 @@ impl ElasticSearchStore {
.map(|delete_response| delete_response.deleted)
.map_err(|err| trc::StoreEvent::ElasticsearchError.reason(err))
}
pub async fn refresh_index(&self, index: SearchIndex) -> trc::Result<()> {
let url = format!("{}/{}/_refresh", self.url, index.es_index_name());
assert_success(self.client.post(url).send().await)
.await
.map(|_| ())
}
}
fn build_query(filters: &[SearchFilter]) -> Value {
@@ -195,7 +181,8 @@ fn build_query(filters: &[SearchFilter]) -> Value {
for filter in filters {
match filter {
SearchFilter::Operator { field, op, value } => {
if field.is_text() {
if field.is_text() && matches!(op, SearchOperator::Equal | SearchOperator::Contains)
{
let SearchValue::Text { value, .. } = value else {
debug_assert!(false, "Invalid value type for text field");
continue;
@@ -321,9 +308,17 @@ fn build_sort(sort: &[SearchComparator]) -> Value {
Value::Array(
sort.iter()
.filter_map(|comp| match comp {
SearchComparator::Field { field, ascending } => Some(json!({
field.es_field(): if *ascending { "asc" } else { "desc" }
})),
SearchComparator::Field { field, ascending } => {
let field = if field.is_text() {
format!("{}.keyword", field.es_field())
} else {
field.es_field().to_string()
};
Some(json!({
field: if *ascending { "asc" } else { "desc" }
}))
}
_ => None,
})
.collect(),

View File

@@ -124,6 +124,8 @@ impl PostgresStore {
let conn = self.conn_pool.get().await.map_err(into_error)?;
let s = conn.prepare_cached(&query).await.map_err(into_error)?;
let c = println!("Executing search query: {}", query);
conn.query(&s, params.as_slice())
.await
.and_then(|rows| {

View File

@@ -4,9 +4,8 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use std::{fmt::Display, io::Write, ops::Range, time::Duration};
use s3::{Bucket, Region, creds::Credentials};
use std::{fmt::Display, io::Write, ops::Range, time::Duration};
use utils::{
codec::base32_custom::Base32Writer,
config::{Config, utils::AsKey},
@@ -48,9 +47,9 @@ impl S3Store {
let timeout = config
.property_or_default::<Duration>((&prefix, "timeout"), "30s")
.unwrap_or_else(|| Duration::from_secs(30));
let allow_invalid = config
.property_or_default::<bool>((&prefix, "tls.allow-invalid"), "false")
.unwrap_or_default();
/*let allow_invalid = config
.property_or_default::<bool>((&prefix, "tls.allow-invalid"), "false")
.unwrap_or_default();*/
Some(S3Store {
bucket: Bucket::new(
@@ -63,11 +62,11 @@ impl S3Store {
})
.ok()?
.with_path_style()
.set_dangereous_config(allow_invalid, allow_invalid)
/*.set_dangereous_config(allow_invalid, allow_invalid)
.map_err(|err| {
config.new_build_error(prefix.as_str(), format!("Failed to create bucket: {err:?}"))
})
.ok()?
.ok()?*/
.with_request_timeout(timeout)
.map_err(|err| {
config.new_build_error(prefix.as_str(), format!("Failed to create bucket: {err:?}"))

View File

@@ -46,10 +46,10 @@ impl SearchStore {
} => {
account_id = *id as u32;
}
SearchFilter::Operator { .. } => {}
_ => {
SearchFilter::DocumentSet(_) => {
has_local_filters = true;
}
_ => (),
}
}
if account_id == u32::MAX {
@@ -190,7 +190,21 @@ impl SearchStore {
Ordering::Less => Ok(vec![]),
Ordering::Greater => {
if !query.comparators.is_empty() {
if query.comparators[0].is_external() {
let mut local = Vec::with_capacity(query.comparators.len());
let mut external = Vec::with_capacity(query.comparators.len());
let mut external_first = false;
for (pos, comparator) in query.comparators.into_iter().enumerate() {
if comparator.is_external() {
external.push(comparator);
if pos == 0 {
external_first = true;
}
} else {
local.push(comparator);
}
}
if !external.is_empty() {
let results = results.results();
let filters = vec![
SearchFilter::Operator {
@@ -209,23 +223,34 @@ impl SearchStore {
value: SearchValue::Uint(results.max().unwrap() as u64),
},
];
let comparators = query
.comparators
.into_iter()
.filter(|c| c.is_external())
.collect::<Vec<_>>();
self.sub_query(query.index, &filters, &comparators)
.await
.map(|items| {
items
.into_iter()
.filter(|id| results.contains(*id))
.collect()
})
} else {
Ok(results.with_comparators(query.comparators).into_sorted())
let ordered_results =
self.sub_query(query.index, &filters, &external).await?;
if local.is_empty() {
return Ok(ordered_results
.into_iter()
.filter(|id| results.contains(*id))
.collect());
}
let comparator = SearchComparator::SortedSet {
set: ordered_results
.into_iter()
.enumerate()
.map(|(pos, id)| (id, pos as u32))
.collect(),
ascending: true,
};
if external_first {
local.insert(0, comparator);
} else {
local.push(comparator);
}
}
Ok(results.with_comparators(local).into_sorted())
} else {
Ok(results.results().iter().collect())
}

View File

@@ -457,7 +457,8 @@ impl Store {
pub async fn destroy(&self) {
use crate::*;
if self.is_pg_or_mysql() {
#[cfg(any(feature = "postgres", feature = "mysql"))]
{
use crate::write::SearchIndex;
for index in [

View File

@@ -11,8 +11,9 @@ use crate::{
term::{TermIndex, TermIndexBuilder},
},
write::{
AlignedBytes, Archive, BatchBuilder, SEARCH_INDEX_MAX_FIELD_LEN, SearchIndexClass,
SearchIndexField, SearchIndexId, SearchIndexType, ValueClass, key::DeserializeBigEndian,
AlignedBytes, Archive, BatchBuilder, SEARCH_INDEX_MAX_FIELD_LEN, SearchIndex,
SearchIndexClass, SearchIndexField, SearchIndexId, SearchIndexType, ValueClass,
key::DeserializeBigEndian,
},
};
use ahash::AHashMap;
@@ -24,11 +25,60 @@ impl Store {
for document in documents {
let mut batch = BatchBuilder::new();
let index = document.index;
let mut old_term_index = None;
if matches!(index, SearchIndex::Calendar | SearchIndex::Contacts) {
let mut account_id = None;
let mut document_id = None;
for (field, value) in &document.fields {
if let SearchValue::Uint(id) = value {
match field {
SearchField::AccountId => {
account_id = Some(*id as u32);
}
SearchField::DocumentId => {
document_id = Some(*id as u32);
}
_ => {}
}
}
}
if let (Some(account_id), Some(document_id)) = (account_id, document_id)
&& let Some(archive) = self
.get_value::<Archive<AlignedBytes>>(ValueKey::from(
ValueClass::SearchIndex(SearchIndexClass {
index,
id: SearchIndexId::Account {
account_id,
document_id,
},
typ: SearchIndexType::Document,
}),
))
.await
.caused_by(trc::location!())?
{
old_term_index = Some(archive);
}
}
let term_index_builder = TermIndexBuilder::build(document);
term_index_builder
.index
.write_index(&mut batch, index, term_index_builder.id)
.caused_by(trc::location!())?;
if let Some(old_term_index) = old_term_index {
let old_term_index = old_term_index
.unarchive::<TermIndex>()
.caused_by(trc::location!())?;
term_index_builder
.index
.merge_index(&mut batch, index, term_index_builder.id, old_term_index)
.caused_by(trc::location!())?;
} else {
term_index_builder
.index
.write_index(&mut batch, index, term_index_builder.id)
.caused_by(trc::location!())?;
}
self.write(batch.build_all())
.await
.caused_by(trc::location!())?;

View File

@@ -199,9 +199,11 @@ impl QueryResults {
results.sort_by(|a, b| {
for comparator in &comparators {
let (a, b, is_ascending) = match comparator {
SearchComparator::DocumentSet { set, ascending } => {
(set.contains(*a) as u32, set.contains(*b) as u32, *ascending)
}
SearchComparator::DocumentSet { set, ascending } => (
!set.contains(*a) as u32,
!set.contains(*b) as u32,
*ascending,
),
SearchComparator::SortedSet { set, ascending } => (
*set.get(a).unwrap_or(&u32::MAX),
*set.get(b).unwrap_or(&u32::MAX),

View File

@@ -158,6 +158,7 @@ pub struct IndexDocument {
pub(crate) fields: AHashMap<SearchField, SearchValue>,
}
#[derive(Debug)]
pub struct QueryResults {
results: RoaringBitmap,
comparators: Vec<SearchComparator>,

View File

@@ -13,6 +13,7 @@ use crate::{
SearchIndexId, SearchIndexType, ValueClass,
},
};
use ahash::AHashSet;
use nlp::{
language::stemmer::Stemmer,
tokenizers::{space::SpaceTokenizer, word::WordTokenizer},
@@ -197,7 +198,7 @@ impl TermIndexBuilder {
_ => {
debug_assert!(
false,
"Invalid combination of AccountId, DocumentId and Id fields"
"Invalid combination of AccountId {account_id:?}, DocumentId {document_id:?} and Id {id:?} fields"
);
SearchIndexId::Global { id: 0 }
}
@@ -253,6 +254,87 @@ impl TermIndex {
Ok(())
}
pub fn merge_index(
self,
batch: &mut BatchBuilder,
index: SearchIndex,
id: SearchIndexId,
old_term: &ArchivedTermIndex,
) -> trc::Result<()> {
let archive = Archiver::new(self);
batch.set(
ValueClass::SearchIndex(SearchIndexClass {
index,
id,
typ: SearchIndexType::Document,
}),
archive.serialize()?,
);
let mut old_terms = AHashSet::with_capacity(old_term.terms.len());
let mut old_fields = AHashSet::with_capacity(old_term.fields.len());
for term in old_term.terms.iter() {
let mut fields = term.fields.to_native();
while let Some(field) = fields.bit_pop() {
old_terms.insert(SearchIndexType::Term {
hash: term.hash.to_native(),
field,
});
}
}
for field in old_term.fields.iter() {
old_fields.insert(SearchIndexField {
field_id: field.field_id,
len: field.len,
data: field.data,
});
}
for term in archive.inner.terms {
let mut fields = term.fields;
while let Some(field) = fields.bit_pop() {
let typ = SearchIndexType::Term {
hash: term.hash,
field,
};
if !old_terms.remove(&typ) {
batch.set(
ValueClass::SearchIndex(SearchIndexClass { index, id, typ }),
vec![],
);
}
}
}
for field in archive.inner.fields {
if !old_fields.remove(&field) {
batch.set(
ValueClass::SearchIndex(SearchIndexClass {
index,
id,
typ: SearchIndexType::Index { field },
}),
vec![],
);
}
}
for typ in old_terms {
batch.clear(ValueClass::SearchIndex(SearchIndexClass { index, id, typ }));
}
for field in old_fields {
batch.clear(ValueClass::SearchIndex(SearchIndexClass {
index,
id,
typ: SearchIndexType::Index { field },
}));
}
Ok(())
}
}
impl ArchivedTermIndex {

View File

@@ -196,6 +196,7 @@ impl TaskQueueEvent {
TaskQueueEvent::BlobNotFound => "Blob not found for task",
TaskQueueEvent::MetadataNotFound => "Metadata not found for task",
TaskQueueEvent::TaskIgnored => "Task ignored based on current server roles",
TaskQueueEvent::TaskFailed => "Task failed during processing",
}
}
@@ -206,6 +207,7 @@ impl TaskQueueEvent {
TaskQueueEvent::BlobNotFound => "The requested blob was not found for task",
TaskQueueEvent::MetadataNotFound => "The metadata was not found for task",
TaskQueueEvent::TaskIgnored => "The task was ignored based on the current server roles",
TaskQueueEvent::TaskFailed => "The task failed during processing",
}
}
}

View File

@@ -382,6 +382,7 @@ impl EventType {
| TaskQueueEvent::TaskLocked
| TaskQueueEvent::TaskIgnored
| TaskQueueEvent::MetadataNotFound => Level::Debug,
TaskQueueEvent::TaskFailed => Level::Warn,
},
EventType::Dmarc(_) => Level::Debug,
EventType::Spf(_) => Level::Debug,

View File

@@ -240,6 +240,7 @@ pub enum TaskQueueEvent {
TaskAcquired,
TaskLocked,
TaskIgnored,
TaskFailed,
BlobNotFound,
MetadataNotFound,
}

View File

@@ -894,6 +894,7 @@ impl EventType {
EventType::Calendar(CalendarEvent::ItipMessageReceived) => 584,
EventType::Calendar(CalendarEvent::ItipMessageError) => 585,
EventType::TaskQueue(TaskQueueEvent::TaskIgnored) => 586,
EventType::TaskQueue(TaskQueueEvent::TaskFailed) => 587,
}
}
@@ -1526,6 +1527,7 @@ impl EventType {
584 => Some(EventType::Calendar(CalendarEvent::ItipMessageReceived)),
585 => Some(EventType::Calendar(CalendarEvent::ItipMessageError)),
586 => Some(EventType::TaskQueue(TaskQueueEvent::TaskIgnored)),
587 => Some(EventType::TaskQueue(TaskQueueEvent::TaskFailed)),
_ => None,
}
}

View File

@@ -5,7 +5,6 @@
*/
const ARCHIVE_FIELD: u8 = 50;
const DOCUMENT_ID_FIELD: u8 = 51;
pub trait FieldType: Into<u8> + Copy + std::fmt::Debug + PartialEq + Eq {}
@@ -67,6 +66,13 @@ pub enum EmailSubmissionField {
Metadata,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum IdentityField {
Archive,
DocumentId,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
#[repr(u8)]
pub enum PrincipalField {
@@ -161,6 +167,15 @@ impl From<PrincipalField> for u8 {
}
}
impl From<IdentityField> for u8 {
fn from(value: IdentityField) -> Self {
match value {
IdentityField::Archive => ARCHIVE_FIELD,
IdentityField::DocumentId => 51,
}
}
}
impl From<Field> for u8 {
fn from(value: Field) -> Self {
value.0
@@ -215,9 +230,14 @@ impl From<EmailSubmissionField> for Field {
}
}
impl From<IdentityField> for Field {
fn from(value: IdentityField) -> Self {
Field(u8::from(value))
}
}
impl Field {
pub const ARCHIVE: Field = Field(ARCHIVE_FIELD);
pub const DOCUMENT_ID: Field = Field(DOCUMENT_ID_FIELD);
pub fn new(value: u8) -> Self {
Field(value)
@@ -237,3 +257,4 @@ impl FieldType for MailboxField {}
impl FieldType for PrincipalField {}
impl FieldType for SieveField {}
impl FieldType for EmailSubmissionField {}
impl FieldType for IdentityField {}

View File

@@ -7,21 +7,13 @@
use nohash_hasher::IsEnabled;
use std::{
collections::{BTreeMap, HashMap, HashSet},
fmt::Debug,
hash::Hash,
};
// A hash that can cheekily store small inputs directly without hashing them.
#[derive(
Debug,
Copy,
Clone,
PartialEq,
Eq,
PartialOrd,
Ord,
rkyv::Serialize,
rkyv::Deserialize,
rkyv::Archive,
Copy, Clone, PartialEq, Eq, PartialOrd, Ord, rkyv::Serialize, rkyv::Deserialize, rkyv::Archive,
)]
#[repr(transparent)]
pub struct CheekyHash([u8; HASH_SIZE]);
@@ -34,6 +26,7 @@ pub type CheekyHashMap<V> = HashMap<CheekyHash, V, nohash_hasher::BuildNoHashHas
pub type CheekyBTreeMap<V> = BTreeMap<CheekyHash, V>;
impl CheekyHash {
pub const HASH_SIZE: usize = HASH_SIZE;
pub const NULL: CheekyHash = CheekyHash([0u8; HASH_SIZE]);
pub const FULL: CheekyHash = CheekyHash([u8::MAX; HASH_SIZE]);
@@ -58,9 +51,10 @@ impl CheekyHash {
pub fn deserialize(bytes: &[u8]) -> Option<Self> {
let len = *bytes.first()?;
let mut hash = [len; HASH_SIZE];
let mut hash = [0u8; HASH_SIZE];
let hash_len = 1 + (len as usize).min(HASH_PAYLOAD);
hash[0] = len;
hash[1..hash_len].copy_from_slice(bytes.get(1..hash_len)?);
Some(CheekyHash(hash))
}
@@ -140,6 +134,23 @@ impl ArchivedCheekyHash {
}
}
impl Debug for CheekyHash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let len = self.payload_len();
let payload = self.payload();
let payload_str = if len <= HASH_PAYLOAD as u8 {
std::str::from_utf8(payload).unwrap_or("<non-utf8>")
} else {
"<hashed data>"
};
f.debug_struct("CheekyHash")
.field("length", &len)
.field("bytes", &payload_str)
.finish()
}
}
#[cfg(test)]
mod tests {
use super::*;
@@ -273,6 +284,17 @@ mod tests {
"Debug output should contain type name"
);
// Test 13: CheekyHashSet and CheekyHashMap
let mut cheeky_set: CheekyHashSet = CheekyHashSet::default();
cheeky_set.insert(CheekyHash::new(b"set_item"));
assert!(cheeky_set.contains(&CheekyHash::new(b"set_item")));
let mut cheeky_map: CheekyHashMap<&str> = CheekyHashMap::default();
cheeky_map.insert(CheekyHash::new(b"map_key"), "map_value");
assert_eq!(
cheeky_map.get(&CheekyHash::new(b"map_key")),
Some(&"map_value")
);
println!("All CheekyHash tests passed!");
}
}

View File

@@ -4,9 +4,8 @@ version = "0.14.1"
edition = "2024"
[features]
default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "nats", "azure", "foundationdb"]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis"]
#default = ["rocks", "redis", "s3"]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "nats", "azure", "foundationdb", "enterprise"]
default = ["rocks"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]
@@ -44,10 +43,11 @@ managesieve = { path = "../crates/managesieve", features = ["test_mode", "enterp
smtp-proto = { version = "0.2" }
mail-send = { version = "0.5", default-features = false, features = ["cram-md5", "ring", "tls12"] }
mail-auth = { version = "0.7.1", features = ["test"] }
mail-parser = { version = "0.11", features = ["full_encoding", "rkyv"] }
mail-builder = "0.4.4"
sieve-rs = { version = "0.7", features = ["rkyv"] }
utils = { path = "../crates/utils", features = ["test_mode"] }
jmap-client = { version = "0.4", features = ["websockets", "debug", "async"] }
mail-parser = { version = "0.11", features = ["full_encoding", "rkyv"] }
tokio = { version = "1.47", features = ["full"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring", "tls12"] }
rustls = { version = "0.23.5", default-features = false, features = ["std", "ring", "tls12"] }
@@ -78,6 +78,5 @@ rkyv = { version = "0.8.10", features = ["little_endian"] }
compact_str = "0.9.0"
quick-xml = "0.38"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.5.0"

View File

@@ -7,6 +7,7 @@ To: Heinz =?iso-8859-1?Q?M=FCller?= <mueller@example.com>
From: Doug Sauder <dwsauder@example.com>
Subject: =?iso-8859-1?Q?Die_Hasen_und_die_Fr=F6sche?=
Mime-Version: 1.0
Content-Language: de
Content-Type: text/html; charset="iso-8859-1"
Content-Transfer-Encoding: quoted-printable

View File

@@ -2,6 +2,7 @@ From: Al Gore <vice-president@whitehouse.gov>
To: White House Transportation Coordinator
<transport@whitehouse.gov>
Subject: [Fwd: Map of Argentina with Description]
Content-Language: en
Content-Type: multipart/mixed;
boundary="D7F------------D7FD5A0B8AB9C65CCDBFA872"

View File

@@ -1,5 +1,6 @@
From: Abidjan Prince <your_prince@gmail.com>
To: Bill Foobar <foobar@example.com>
Content-Language: en
Subject: Help a friend from Abidjan Côte d'Ivoire
When my mother died when she was given birth to me, my father took me so

View File

@@ -1,5 +1,6 @@
From: "孫子" <sun.tzu@wu.state>
To: "Bill Foobar" <foobar@example.com>
Content-Language: zh
Subject: 孫子兵法
<"孫子兵法:">

View File

@@ -21,7 +21,9 @@ pub mod store;
pub mod thread;
use crate::{
AssertConfig, add_test_certs, directory::internal::TestInternalDirectory, store::TempDir,
AssertConfig, add_test_certs,
directory::internal::TestInternalDirectory,
store::{TempDir, build_store_config},
};
use ::managesieve::core::ManageSieveSessionManager;
use ::store::Stores;
@@ -59,12 +61,7 @@ pub async fn imap_tests() {
// Prepare settings
let start_time = Instant::now();
let delete = true;
let handle = init_imap_tests(
&std::env::var("STORE")
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
delete,
)
.await;
let handle = init_imap_tests(delete).await;
// Connect to IMAP server
let mut imap_check = ImapConnection::connect(b"_y ").await;
@@ -139,12 +136,11 @@ pub struct IMAPTest {
shutdown_tx: watch::Sender<bool>,
}
async fn init_imap_tests(store_id: &str, delete_if_exists: bool) -> IMAPTest {
async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
// Load and parse config
let temp_dir = TempDir::new("imap_tests", delete_if_exists);
let mut config = Config::new(
add_test_certs(SERVER)
.replace("{STORE}", store_id)
add_test_certs(&(build_store_config(&temp_dir.path.to_string_lossy()) + SERVER))
.replace("{TMP}", &temp_dir.path.display().to_string())
.replace(
"{LEVEL}",
@@ -649,7 +645,6 @@ reject-non-fqdn = false
[session.rcpt]
relay = [ { if = "!is_empty(authenticated_as)", then = true },
{ else = false } ]
directory = "'{STORE}'"
[session.rcpt.errors]
total = 5
@@ -701,48 +696,6 @@ delivered-to = false
future-release = [ { if = "!is_empty(authenticated_as)", then = "99999999d"},
{ else = false } ]
[store."sqlite"]
type = "sqlite"
path = "{TMP}/sqlite.db"
[store."rocksdb"]
type = "rocksdb"
path = "{TMP}/rocks.db"
[store."foundationdb"]
type = "foundationdb"
[store."postgresql"]
type = "postgresql"
host = "localhost"
port = 5432
database = "stalwart"
user = "postgres"
password = "mysecretpassword"
[store."psql-replica"]
type = "sql-read-replica"
primary = "postgresql"
replicas = "postgresql"
[store."mysql"]
type = "mysql"
host = "localhost"
port = 3307
database = "stalwart"
user = "root"
password = "password"
[store."elastic"]
type = "elasticsearch"
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
disable = true
[store."elastic".tls]
allow-invalid-certs = true
[certificate.default]
cert = "%{file:{CERT}}%"
private-key = "%{file:{PK}}%"
@@ -750,13 +703,6 @@ private-key = "%{file:{PK}}%"
[imap.protocol]
uidplus = true
[storage]
data = "{STORE}"
fts = "{STORE}"
blob = "{STORE}"
lookup = "{STORE}"
directory = "{STORE}"
[jmap.protocol]
set.max-objects = 100000
@@ -820,10 +766,6 @@ verify = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' AND type
expand = "SELECT p.address FROM emails AS p JOIN emails AS l ON p.name = l.name WHERE p.type = 'primary' AND l.address = ? AND l.type = 'list' ORDER BY p.address LIMIT 50"
domains = "SELECT 1 FROM emails WHERE address LIKE '%@' || ? LIMIT 1"
[directory."{STORE}"]
type = "internal"
store = "{STORE}"
[oauth]
key = "parerga_und_paralipomena"
[oauth.auth]

View File

@@ -119,5 +119,5 @@ pub async fn test(imap: &mut ImapConnection, imap_check: &mut ImapConnection) {
.await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
.await
.assert_contains("COUNT 10 ALL 6,4:5,1,10,9,3,7:8,2");
.assert_contains("COUNT 10 ALL 6,4:5,1,10,3,7:8,2,9"); //6,4:5,1,10,9,3,7:8,2");
}

View File

@@ -5,18 +5,22 @@
*/
use crate::{
jmap::{JMAPTest, wait_for_index},
jmap::{Account, JMAPTest, wait_for_index},
store::{deflate_test_resource, query::FIELDS},
};
use ::email::{cache::MessageCacheFetch, mailbox::Mailbox};
use ahash::AHashSet;
use common::storage::index::ObjectIndexBuilder;
use common::{Server, storage::index::ObjectIndexBuilder};
use jmap_client::{
client::Client,
core::query::{Comparator, Filter},
email,
};
use mail_parser::{DateTime, HeaderName};
use mail_builder::{
MessageBuilder,
headers::{date::Date, message_id::MessageId, text::Text},
};
use mail_parser::HeaderName;
use std::{collections::hash_map::Entry, str::FromStr, time::Instant};
use store::{
ahash::AHashMap,
@@ -67,7 +71,7 @@ pub async fn test(params: &mut JMAPTest, insert: bool) {
// Create test messages
println!("Inserting JMAP Mail query test messages...");
create(client).await;
create(&server, account).await;
assert_eq!(
params
@@ -140,7 +144,7 @@ pub async fn query(client: &Client) {
),
(
Filter::and(vec![
(email::query::Filter::subject("study")),
(email::query::Filter::text("study")),
(email::query::Filter::in_mailbox_other_than(vec![
Id::new(1991).to_string(),
Id::new(1870).to_string(),
@@ -699,7 +703,7 @@ pub async fn query_options(client: &Client) {
}
}
pub async fn create(client: &Client) {
pub async fn create(server: &Server, account: &Account) {
let sent_at = now();
let now = Instant::now();
let mut fields = AHashMap::default();
@@ -712,6 +716,9 @@ pub async fn create(client: &Client) {
let mut thread_count = AHashMap::default();
let mut artist_count = AHashMap::default();
let mut messages = Vec::new();
let mut chunks = Vec::new();
'outer: for (idx, record) in csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(&deflate_test_resource("artwork_data.csv.gz")[..])
@@ -797,41 +804,65 @@ pub async fn create(client: &Client) {
}
}
client
.email_import(
format!(
concat!(
"Date: {}\nFrom: \"{}\" <artist@domain.com>\nCc: \"{}\" <cc@domain.com>\nMessage-ID: <{}>\n",
"References: <{}>\nComments: {}\nSubject: [{}]",
" Year {}\n\n{}\n{}\n"
),
DateTime::from_timestamp(sent_at as i64 + idx as i64).to_rfc822(),
values_str["artist"],
values_str["medium"],
values_str["accession_number"],
values_int["year"],
values_str["artistRole"],
values_str["title"],
values_int["year"],
values_str["creditLine"],
values_str["inscription"]
)
.into_bytes(),
[
Id::new(values_int["year"] as u64).to_string(),
Id::new((values_int["acquisitionYear"] + 1000) as u64).to_string(),
],
keywords
.into(),
Some(values_int["year"] as i64),
)
.await
let message = MessageBuilder::new()
.from((values_str["artist"].as_str(), "artist@domain.com"))
.cc((values_str["medium"].as_str(), "cc@domain.com"))
.subject(format!("Year {}", values_int["year"]))
.date(Date::new(sent_at as i64 + idx as i64))
.message_id(values_str["accession_number"].as_str())
.header("References", MessageId::new(values_int["year"].to_string()))
.header("Comments", Text::new(values_str["artistRole"].as_str()))
.text_body(format!(
"{}\n{}\n",
values_str["creditLine"], values_str["inscription"]
))
.attachment("text/plain", "details.txt", values_str["title"].as_bytes())
.write_to_vec()
.unwrap();
messages.push((
message,
[
Id::new(values_int["year"] as u64).to_string(),
Id::new((values_int["acquisitionYear"] + 1000) as u64).to_string(),
],
keywords,
values_int["year"] as i64,
));
if messages.len() == 100 {
chunks.push(messages);
messages = Vec::new();
}
if total_messages == MAX_MESSAGES {
break;
}
}
if !messages.is_empty() {
chunks.push(messages);
}
let mut tasks = Vec::new();
for chunk in chunks {
let client = account.client_owned().await;
tasks.push(tokio::spawn(async move {
for (raw_message, mailbox_ids, keywords, sent_at) in chunk {
client
.email_import(raw_message, mailbox_ids, keywords.into(), Some(sent_at))
.await
.unwrap();
}
}));
}
for task in tasks {
task.await.unwrap();
}
wait_for_index(server).await;
println!(
"Imported {} messages in {} ms (single thread).",
total_messages,

View File

@@ -5,7 +5,7 @@
*/
use crate::{
jmap::{JMAPTest, mail::mailbox::destroy_all_mailboxes_no_wait},
jmap::{JMAPTest, mail::mailbox::destroy_all_mailboxes_no_wait, wait_for_index},
store::deflate_test_resource,
};
use ::email::{
@@ -141,6 +141,8 @@ async fn test_single_thread(params: &mut JMAPTest) {
}
}
wait_for_index(&params.server).await;
for test_num in 0..=5 {
let result = client
.set_default_account_id(Id::new((base_test_num + test_num) as u64).to_string())

View File

@@ -11,7 +11,7 @@ use crate::{
enterprise::{EnterpriseCore, insert_test_metrics},
webhooks::{MockWebhookEndpoint, spawn_mock_webhook_endpoint},
},
store::TempDir,
store::{TempDir, build_store_config},
};
use ahash::AHashMap;
use base64::{
@@ -64,13 +64,8 @@ pub mod server;
#[tokio::test(flavor = "multi_thread")]
async fn jmap_tests() {
let delete = true;
let mut params = init_jmap_tests(
&std::env::var("STORE")
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
delete,
)
.await;
let delete = std::env::var("NO_DELETE").is_err();
let mut params = init_jmap_tests(delete).await;
server::webhooks::test(&mut params).await;
@@ -131,12 +126,7 @@ async fn jmap_tests() {
#[ignore]
#[tokio::test(flavor = "multi_thread")]
pub async fn jmap_metric_tests() {
let params = init_jmap_tests(
&std::env::var("STORE")
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
false,
)
.await;
let params = init_jmap_tests(false).await;
insert_test_metrics(params.server.core.clone()).await;
}
@@ -206,6 +196,7 @@ impl Account {
}
pub async fn wait_for_index(server: &Server) {
let mut count = 0;
loop {
let mut has_index_tasks = false;
server
@@ -234,6 +225,10 @@ pub async fn wait_for_index(server: &Server) {
.unwrap();
if has_index_tasks {
count += 1;
if count % 10 == 0 {
println!("Waiting for pending index tasks...");
}
tokio::time::sleep(Duration::from_millis(300)).await;
} else {
break;
@@ -270,12 +265,11 @@ pub async fn assert_is_empty(server: &Server) {
server.inner.cache.messages.clear();
}
async fn init_jmap_tests(store_id: &str, delete_if_exists: bool) -> JMAPTest {
async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
// Load and parse config
let temp_dir = TempDir::new("jmap_tests", delete_if_exists);
let mut config = Config::new(
add_test_certs(SERVER)
.replace("{STORE}", store_id)
add_test_certs(&(build_store_config(&temp_dir.path.to_string_lossy()) + SERVER))
.replace("{TMP}", &temp_dir.path.display().to_string())
.replace(
"{LEVEL}",
@@ -1414,7 +1408,6 @@ reject-non-fqdn = false
[session.rcpt]
relay = [ { if = "!is_empty(authenticated_as)", then = true },
{ else = false } ]
directory = "'{STORE}'"
[session.rcpt.errors]
total = 5
@@ -1422,7 +1415,6 @@ wait = "1ms"
[session.auth]
mechanisms = "[plain, login, oauthbearer]"
directory = "'{STORE}'"
[session.data]
spam-filter = "recipients[0] != 'robert@example.com'"
@@ -1460,52 +1452,10 @@ allow-invalid-certs = true
future-release = [ { if = "!is_empty(authenticated_as)", then = "99999999d"},
{ else = false } ]
[store."sqlite"]
type = "sqlite"
path = "{TMP}/sqlite.db"
[store."rocksdb"]
type = "rocksdb"
path = "{TMP}/rocks.db"
[store."foundationdb"]
type = "foundationdb"
[store."postgresql"]
type = "postgresql"
host = "localhost"
port = 5432
database = "stalwart"
user = "postgres"
password = "mysecretpassword"
[store."mysql"]
type = "mysql"
host = "localhost"
port = 3307
database = "stalwart"
user = "root"
password = "password"
[store."elastic"]
type = "elasticsearch"
url = "https://localhost:9200"
user = "elastic"
password = "changeme"
tls.allow-invalid-certs = true
disable = true
[certificate.default]
cert = "%{file:{CERT}}%"
private-key = "%{file:{PK}}%"
[storage]
data = "{STORE}"
fts = "{STORE}"
blob = "{STORE}"
lookup = "{STORE}"
directory = "{STORE}"
[jmap.protocol.get]
max-objects = 100000
@@ -1557,10 +1507,6 @@ verify = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' AND type
expand = "SELECT p.address FROM emails AS p JOIN emails AS l ON p.name = l.name WHERE p.type = 'primary' AND l.address = ? AND l.type = 'list' ORDER BY p.address LIMIT 50"
domains = "SELECT 1 FROM emails WHERE address LIKE '%@' || ? LIMIT 1"
[directory."{STORE}"]
type = "internal"
store = "{STORE}"
[imap.auth]
allow-plain-text = true

View File

@@ -179,6 +179,11 @@ type = "redis"
urls = "redis://127.0.0.1"
redis-type = "single"
[store."psql-replica"]
type = "sql-read-replica"
primary = "postgresql"
replicas = "postgresql"
[storage]
data = "{STORE}"
fts = "{SEARCH_STORE}"
@@ -186,4 +191,14 @@ blob = "{BLOB_STORE}"
lookup = "{LOOKUP_STORE}"
directory = "{STORE}"
[directory."{STORE}"]
type = "internal"
store = "{STORE}"
[session.rcpt]
directory = "'{STORE}'"
[session.auth]
directory = "'{STORE}'"
"#;

View File

@@ -19,6 +19,7 @@ use types::collection::{Collection, SyncCollection};
// FDB max value
const MAX_VALUE_SIZE: usize = 100000;
#[cfg(feature = "foundationdb")]
fn value_gen(chunks: impl IntoIterator<Item = (u8, usize)>) -> Vec<u8> {
let mut value = Vec::new();
for (byte, size) in chunks {

View File

@@ -13,7 +13,7 @@ use std::{
time::Instant,
};
use store::{
SearchStore, Store,
SearchStore,
ahash::AHashMap,
roaring::RoaringBitmap,
search::{
@@ -275,6 +275,11 @@ pub async fn test(store: SearchStore, do_insert: bool) {
}
}
// Refresh
if let SearchStore::ElasticSearch(store) = &store {
store.refresh_index(SearchIndex::Email).await.unwrap();
}
println!("\nInsert took {} ms.", now.elapsed().as_millis());
}
@@ -295,7 +300,11 @@ pub async fn test(store: SearchStore, do_insert: bool) {
}
async fn test_filter(store: SearchStore, fields: &AHashMap<u32, String>, mask: &RoaringBitmap) {
let can_stem = !matches!(store, SearchStore::Store(Store::MySQL(_)));
#[cfg(feature = "mysql")]
let can_stem = !matches!(store, SearchStore::Store(store::Store::MySQL(_)));
#[cfg(not(feature = "mysql"))]
let can_stem = true;
let tests = [
(
@@ -473,8 +482,11 @@ async fn test_filter(store: SearchStore, fields: &AHashMap<u32, String>, mask: &
}
async fn test_sort(store: SearchStore, fields: &AHashMap<u32, String>, mask: &RoaringBitmap) {
let is_reversed =
matches!(store, SearchStore::Store(Store::MySQL(_))) || store.internal_fts().is_some();
#[cfg(feature = "postgres")]
let is_reversed = matches!(store, SearchStore::Store(store::Store::PostgreSQL(_)));
#[cfg(not(feature = "postgres"))]
let is_reversed = false;
let tests = [
(
@@ -523,7 +535,7 @@ async fn test_sort(store: SearchStore, fields: &AHashMap<u32, String>, mask: &Ro
SearchComparator::descending(EmailSearchField::Cc),
SearchComparator::ascending(EmailSearchField::To),
],
if !is_reversed {
if is_reversed {
vec![
"ar00052", "ar00627", "t00352", "t07275", "t12318", "t04931", "t13683",
"t13686", "t13687", "t13688", "t13689", "t13690", "t13691", "t13769", "t13773",
@@ -564,8 +576,13 @@ async fn test_unindex(store: SearchStore, fields: &AHashMap<u32, String>) {
.query_account(
SearchQuery::new(SearchIndex::Email)
.with_mask(RoaringBitmap::from_iter(fields.keys().copied()))
.with_account_id(0)
.with_filter(SearchFilter::has_keyword(EmailSearchField::From, "paper")),
.with_filters(vec![
SearchFilter::has_keyword(EmailSearchField::From, "gelatin"),
SearchFilter::gt(EmailSearchField::ReceivedAt, 2000u32),
SearchFilter::lt(EmailSearchField::Size, 180u32),
SearchFilter::gt(EmailSearchField::Size, 0u32),
])
.with_account_id(0),
)
.await
.unwrap();
@@ -582,12 +599,22 @@ async fn test_unindex(store: SearchStore, fields: &AHashMap<u32, String>) {
store.unindex(query).await.unwrap();
// Refresh
if let SearchStore::ElasticSearch(store) = &store {
store.refresh_index(SearchIndex::Email).await.unwrap();
}
assert_eq!(
store
.query_account(
SearchQuery::new(SearchIndex::Email)
.with_filters(vec![
SearchFilter::has_keyword(EmailSearchField::From, "gelatin"),
SearchFilter::gt(EmailSearchField::ReceivedAt, 2000u32),
SearchFilter::lt(EmailSearchField::Size, 180u32),
SearchFilter::gt(EmailSearchField::Size, 0u32),
])
.with_account_id(0)
.with_filter(SearchFilter::has_keyword(EmailSearchField::From, "paper"))
.with_mask(RoaringBitmap::from_iter(fields.keys().copied())),
)
.await
@@ -614,6 +641,11 @@ async fn test_global(store: SearchStore) {
store.index(vec![document]).await.unwrap();
}
// Refresh
if let SearchStore::ElasticSearch(store) = &store {
store.refresh_index(SearchIndex::Tracing).await.unwrap();
}
// Query all
assert_eq!(
store
@@ -655,6 +687,12 @@ async fn test_global(store: SearchStore) {
)
.await
.unwrap();
// Refresh
if let SearchStore::ElasticSearch(store) = &store {
store.refresh_index(SearchIndex::Tracing).await.unwrap();
}
assert_eq!(
store
.query_global(

View File

@@ -5,8 +5,10 @@
*/
use crate::{
AssertConfig, TEST_USERS, add_test_certs, directory::internal::TestInternalDirectory,
jmap::assert_is_empty, store::TempDir,
AssertConfig, TEST_USERS, add_test_certs,
directory::internal::TestInternalDirectory,
jmap::assert_is_empty,
store::{TempDir, build_store_config},
};
use ::managesieve::core::ManageSieveSessionManager;
use ::store::Stores;
@@ -76,13 +78,7 @@ fn webdav_tests() {
let assisted_discovery = std::env::var("ASSISTED_DISCOVERY").unwrap_or_default() == "1";
let start_time = Instant::now();
let delete = true;
let handle = init_webdav_tests(
&std::env::var("STORE")
.expect("Missing store type. Try running `STORE=<store_type> cargo test`"),
assisted_discovery,
delete,
)
.await;
let handle = init_webdav_tests(assisted_discovery, delete).await;
basic::test(&handle).await;
put_get::test(&handle).await;
@@ -123,16 +119,11 @@ pub struct WebDavTest {
shutdown_tx: watch::Sender<bool>,
}
async fn init_webdav_tests(
store_id: &str,
assisted_discovery: bool,
delete_if_exists: bool,
) -> WebDavTest {
async fn init_webdav_tests(assisted_discovery: bool, delete_if_exists: bool) -> WebDavTest {
// Load and parse config
let temp_dir = TempDir::new("webdav_tests", delete_if_exists);
let mut config = Config::new(
add_test_certs(SERVER)
.replace("{STORE}", store_id)
add_test_certs(&(build_store_config(&temp_dir.path.to_string_lossy()) + SERVER))
.replace("{TMP}", &temp_dir.path.display().to_string())
.replace("{ASSISTED_DISCOVERY}", &assisted_discovery.to_string())
.replace(
@@ -1106,7 +1097,6 @@ reject-non-fqdn = false
[session.rcpt]
relay = [ { if = "!is_empty(authenticated_as)", then = true },
{ else = false } ]
directory = "'{STORE}'"
[session.rcpt.errors]
total = 5
@@ -1126,59 +1116,10 @@ delivered-to = false
future-release = [ { if = "!is_empty(authenticated_as)", then = "99999999d"},
{ else = false } ]
[store."sqlite"]
type = "sqlite"
path = "{TMP}/sqlite.db"
[store."rocksdb"]
type = "rocksdb"
path = "{TMP}/rocks.db"
[store."foundationdb"]
type = "foundationdb"
[store."postgresql"]
type = "postgresql"
host = "localhost"
port = 5432
database = "stalwart"
user = "postgres"
password = "mysecretpassword"
[store."psql-replica"]
type = "sql-read-replica"
primary = "postgresql"
replicas = "postgresql"
[store."mysql"]
type = "mysql"
host = "localhost"
port = 3307
database = "stalwart"
user = "root"
password = "password"
[store."elastic"]
type = "elasticsearch"
url = "https://localhost:9200"
user = "elastic"
password = "RtQ-Lu6+o4rxx=XJplVJ"
disable = true
[store."elastic".tls]
allow-invalid-certs = true
[certificate.default]
cert = "%{file:{CERT}}%"
private-key = "%{file:{PK}}%"
[storage]
data = "{STORE}"
fts = "{STORE}"
blob = "{STORE}"
lookup = "{STORE}"
directory = "{STORE}"
[jmap.protocol]
set.max-objects = 100000
@@ -1224,10 +1165,6 @@ verify = "SELECT address FROM emails WHERE address LIKE '%' || ? || '%' AND type
expand = "SELECT p.address FROM emails AS p JOIN emails AS l ON p.name = l.name WHERE p.type = 'primary' AND l.address = ? AND l.type = 'list' ORDER BY p.address LIMIT 50"
domains = "SELECT 1 FROM emails WHERE address LIKE '%@' || ? LIMIT 1"
[directory."{STORE}"]
type = "internal"
store = "{STORE}"
[oauth]
key = "parerga_und_paralipomena"