Database schema optimization - part 12

This commit is contained in:
mdecimus
2025-11-15 12:44:04 +01:00
parent 8bbcb999d1
commit dbe40829da
49 changed files with 1364 additions and 694 deletions

View File

@@ -4,8 +4,8 @@ version = "0.14.1"
edition = "2024"
[features]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "nats", "azure", "foundationdb", "enterprise"]
default = ["postgres"]
#default = ["sqlite", "postgres", "mysql", "rocks", "s3", "redis", "nats", "azure", "foundationdb"]
default = ["rocks", "foundationdb"]
sqlite = ["store/sqlite"]
foundationdb = ["store/foundation", "common/foundation"]
postgres = ["store/postgres"]

View File

@@ -9,6 +9,7 @@ use crate::{
directory::internal::TestInternalDirectory,
imap::{ImapConnection, Type},
jmap::server::enterprise::EnterpriseCore,
store::cleanup::store_destroy,
};
use ahash::AHashMap;
use common::{
@@ -103,7 +104,7 @@ async fn init_cluster_tests(delete_if_exists: bool) -> ClusterTest {
let store = servers.first().unwrap().store().clone();
if delete_if_exists {
store.destroy().await;
store_destroy(&store).await;
}
// Create test users

View File

@@ -4,7 +4,10 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::directory::{DirectoryTest, IntoTestPrincipal, TestPrincipal};
use crate::{
directory::{DirectoryTest, IntoTestPrincipal, TestPrincipal},
store::cleanup::store_destroy,
};
use ahash::AHashSet;
use directory::{
Permission, QueryBy, QueryParams, Type,
@@ -30,7 +33,7 @@ async fn internal_directory() {
for (store_id, store) in config.stores.stores {
println!("Testing internal directory with store {:?}", store_id);
store.destroy().await;
store_destroy(&store).await;
// A principal without name should fail
assert_eq!(

View File

@@ -13,8 +13,9 @@ use mail_send::Credentials;
#[allow(unused_imports)]
use store::{InMemoryStore, Store};
use crate::directory::{
DirectoryTest, IntoTestPrincipal, TestPrincipal, map_account_id, map_account_ids,
use crate::{
directory::{DirectoryTest, IntoTestPrincipal, TestPrincipal, map_account_id, map_account_ids},
store::cleanup::store_destroy,
};
use super::DirectoryStore;
@@ -43,7 +44,7 @@ async fn sql_directory() {
let core = config.server;
// Create tables
base_store.destroy().await;
store_destroy(base_store).await;
store.create_test_directory().await;
// Create test users

View File

@@ -23,7 +23,10 @@ pub mod thread;
use crate::{
AssertConfig, add_test_certs,
directory::internal::TestInternalDirectory,
store::{TempDir, build_store_config},
store::{
TempDir, build_store_config,
cleanup::{search_store_destroy, store_destroy},
},
};
use ::managesieve::core::ManageSieveSessionManager;
use ::store::Stores;
@@ -88,11 +91,11 @@ pub async fn imap_tests() {
mailbox::test(&mut imap, &mut imap_check).await;
append::test(&mut imap, &mut imap_check, &handle).await;
search::test(&mut imap, &mut imap_check).await;
search::test(&mut imap, &mut imap_check, &handle).await;
fetch::test(&mut imap, &mut imap_check).await;
store::test(&mut imap, &mut imap_check, &handle).await;
copy_move::test(&mut imap, &mut imap_check).await;
thread::test(&mut imap, &mut imap_check).await;
thread::test(&mut imap, &mut imap_check, &handle).await;
idle::test(&mut imap, &mut imap_check, false).await;
condstore::test(&mut imap, &mut imap_check).await;
acl::test(&mut imap, &mut imap_check).await;
@@ -166,6 +169,7 @@ async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
let cache = Caches::parse(&mut config);
let store = core.storage.data.clone();
let search_store = core.storage.fts.clone();
let (ipc, mut ipc_rxs) = build_ipc(false);
let inner = Arc::new(Inner {
shared_core: core.into_shared(),
@@ -222,7 +226,8 @@ async fn init_imap_tests(delete_if_exists: bool) -> IMAPTest {
});
if delete_if_exists {
store.destroy().await;
store_destroy(&store).await;
search_store_destroy(&search_store).await;
}
// Create tables and test accounts

View File

@@ -4,11 +4,11 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use super::{AssertResult, ImapConnection, Type};
use crate::imap::IMAPTest;
use imap_proto::ResponseType;
use super::{AssertResult, ImapConnection, Type};
pub async fn test(imap: &mut ImapConnection, imap_check: &mut ImapConnection) {
pub async fn test(imap: &mut ImapConnection, imap_check: &mut ImapConnection, handle: &IMAPTest) {
println!("Running SEARCH tests...");
// Searches without selecting a mailbox should fail.
@@ -119,5 +119,9 @@ pub async fn test(imap: &mut ImapConnection, imap_check: &mut ImapConnection) {
.await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
.await
.assert_contains("COUNT 10 ALL 6,4:5,1,10,3,7:8,2,9"); //6,4:5,1,10,9,3,7:8,2");
.assert_contains(if !handle.server.search_store().is_mysql() {
"COUNT 10 ALL 6,4:5,1,10,3,7:8,2,9"
} else {
"COUNT 10 ALL 9,3,7:8,2,6,4:5,1,10"
}); //6,4:5,1,10,9,3,7:8,2");
}

View File

@@ -6,11 +6,11 @@
use imap_proto::ResponseType;
use crate::imap::{AssertResult, expand_uid_list};
use crate::imap::{AssertResult, IMAPTest, expand_uid_list};
use super::{ImapConnection, Type, append::build_messages};
pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection, handle: &IMAPTest) {
println!("Running THREAD tests...");
// Create test messages
@@ -80,12 +80,15 @@ pub async fn test(imap: &mut ImapConnection, _imap_check: &mut ImapConnection) {
.assert_contains("(5 6 7 8)")
.assert_contains("(9 10 11 12)");
imap.send("THREAD REFERENCES UTF-8 SUBJECT T1").await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
.await
.assert_contains("(5 6 7 8)")
.assert_count("(1 2 3 4)", 0)
.assert_count("(9 10 11 12)", 0);
// Filter by subject (mySQL does not support searching for short keywords)
if !handle.server.search_store().is_mysql() {
imap.send("THREAD REFERENCES UTF-8 SUBJECT T1").await;
imap.assert_read(Type::Tagged, ResponseType::Ok)
.await
.assert_contains("(5 6 7 8)")
.assert_count("(1 2 3 4)", 0)
.assert_count("(9 10 11 12)", 0);
}
// Filter by threadId and messageId
imap.send(&format!(

View File

@@ -8,6 +8,7 @@ use crate::{
directory::internal::TestInternalDirectory,
jmap::{JMAPTest, mail::delivery::SmtpConnection, wait_for_index},
smtp::queue::QueuedEvents,
store::cleanup::store_blob_expire_all,
};
use common::config::smtp::queue::QueueName;
use email::{cache::MessageCacheFetch, mailbox::INBOX_ID};
@@ -42,7 +43,7 @@ pub async fn test(params: &mut JMAPTest) {
server.inner.cache.access_tokens.clear();
// Delete temporary blobs from previous tests
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Test temporary blob quota (3 files)
DISABLE_UPLOAD_QUOTA.store(false, std::sync::atomic::Ordering::Relaxed);
@@ -65,7 +66,7 @@ pub async fn test(params: &mut JMAPTest) {
jmap_client::Error::Problem(err) if err.detail().unwrap().contains("quota") => (),
other => panic!("Unexpected error: {:?}", other),
}
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Test temporary blob quota (50000 bytes)
for i in 0..2 {
@@ -86,7 +87,7 @@ pub async fn test(params: &mut JMAPTest) {
jmap_client::Error::Problem(err) if err.detail().unwrap().contains("quota") => (),
other => panic!("Unexpected error: {:?}", other),
}
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Test JMAP Quotas extension
let response = account

View File

@@ -55,7 +55,9 @@ pub async fn test(params: &mut JMAPTest) {
let (stream_tx, mut stream_rx) = mpsc::channel::<WebSocketMessage>(100);
tokio::spawn(async move {
while let Some(change) = ws_stream.next().await {
stream_tx.send(change.unwrap()).await.unwrap();
if stream_tx.send(change.unwrap()).await.is_err() {
break;
}
}
});
client_ws

View File

@@ -5,7 +5,7 @@
*/
use crate::{
jmap::{ChangeType, IntoJmapSet, JMAPTest, JmapUtils},
jmap::{ChangeType, IntoJmapSet, JMAPTest, JmapUtils, wait_for_index},
webdav::DummyWebDavClient,
};
use ahash::AHashSet;
@@ -452,6 +452,7 @@ pub async fn test(params: &mut JMAPTest) {
}));
// Query tests
wait_for_index(&params.server).await;
assert_eq!(
account
.jmap_query(

View File

@@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::jmap::{IntoJmapSet, JMAPTest, JmapUtils};
use crate::jmap::{IntoJmapSet, JMAPTest, JmapUtils, wait_for_index};
use calcard::jscalendar::JSCalendarProperty;
use jmap_proto::{
object::calendar_event_notification::CalendarEventNotificationProperty,
@@ -73,6 +73,7 @@ pub async fn test(params: &mut JMAPTest) {
let john_event_id = response.created(0).id().to_string();
tokio::time::sleep(std::time::Duration::from_millis(600)).await;
wait_for_index(&params.server).await;
// Verify Jane and Bill received the share notification
let mut jane_event_id = String::new();

View File

@@ -4,7 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::jmap::JMAPTest;
use crate::{jmap::JMAPTest, store::cleanup::store_blob_expire_all};
use email::mailbox::INBOX_ID;
use serde_json::{Value, json};
use types::id::Id;
@@ -13,7 +13,7 @@ pub async fn test(params: &mut JMAPTest) {
println!("Running blob tests...");
let server = params.server.clone();
let account = params.account("jdoe@example.com");
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Blob/set simple test
let response = account.jmap_method_call("Blob/upload", json!({
@@ -139,7 +139,7 @@ pub async fn test(params: &mut JMAPTest) {
);
}
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Blob/upload Complex Example
let response = account
@@ -226,7 +226,7 @@ pub async fn test(params: &mut JMAPTest) {
"Pointer {pointer:?} Response: {response:?}",
);
}
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Blob/get Example with Range and Encoding Errors
let response = account.jmap_method_calls(json!([
@@ -353,7 +353,7 @@ pub async fn test(params: &mut JMAPTest) {
"Pointer {pointer:?} Response: {response:?}",
);
}
server.core.storage.data.blob_expire_all().await;
store_blob_expire_all(&server.core.storage.data).await;
// Blob/lookup
let client = account.client();

View File

@@ -4,10 +4,7 @@
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
directory::internal::TestInternalDirectory,
jmap::{JMAPTest},
};
use crate::{directory::internal::TestInternalDirectory, jmap::JMAPTest};
use ::email::mailbox::{INBOX_ID, TRASH_ID};
use jmap_client::{
core::{

View File

@@ -92,8 +92,10 @@ pub async fn test(params: &mut JMAPTest, insert: bool) {
wait_for_index(&server).await;
}
let can_stem = !params.server.search_store().is_mysql();
println!("Running JMAP Mail query tests...");
query(client).await;
query(client, can_stem).await;
println!("Running JMAP Mail query options tests...");
query_options(client).await;
@@ -114,7 +116,7 @@ pub async fn test(params: &mut JMAPTest, insert: bool) {
params.assert_is_empty().await;
}
pub async fn query(client: &Client) {
pub async fn query(client: &Client, can_stem: bool) {
for (filter, sort, expected_results) in [
(
Filter::and(vec![
@@ -144,7 +146,7 @@ pub async fn query(client: &Client) {
),
(
Filter::and(vec![
(email::query::Filter::text("study")),
(email::query::Filter::text(if can_stem { "study" } else { "studies" })),
(email::query::Filter::in_mailbox_other_than(vec![
Id::new(1991).to_string(),
Id::new(1870).to_string(),

View File

@@ -49,6 +49,8 @@ pub async fn test(params: &mut JMAPTest) {
}
wait_for_index(&server).await;
let can_stem = params.server.search_store().internal_fts().is_some();
// Run tests
for (filter, email_name, snippet_subject, snippet_preview) in [
(
@@ -121,7 +123,12 @@ pub async fn test(params: &mut JMAPTest) {
)),
),
(
Filter::text("es:galería vasto biblioteca").into(),
Filter::text(if can_stem {
"es:galería vasto biblioteca"
} else {
"es:galería vastos biblioteca"
})
.into(),
"mixed",
Some("<mark>Biblioteca</mark> de Babel"),
Some(concat!(

View File

@@ -11,7 +11,10 @@ use crate::{
enterprise::{EnterpriseCore, insert_test_metrics},
webhooks::{MockWebhookEndpoint, spawn_mock_webhook_endpoint},
},
store::{TempDir, build_store_config},
store::{
TempDir, build_store_config,
cleanup::{search_store_destroy, store_assert_is_empty, store_destroy},
},
};
use ahash::AHashMap;
use base64::{
@@ -75,9 +78,9 @@ async fn jmap_tests() {
server::webhooks::test(&mut params).await;
/*mail::get::test(&mut params).await;
mail::get::test(&mut params).await;
mail::set::test(&mut params).await;
mail::parse::test(&mut params).await;*/
mail::parse::test(&mut params).await;
mail::query::test(&mut params, delete).await;
mail::search_snippet::test(&mut params).await;
mail::changes::test(&mut params).await;
@@ -263,10 +266,8 @@ pub async fn assert_is_empty(server: &Server) {
.unwrap();
// Assert is empty
server
.store()
.assert_is_empty(server.core.storage.blob.clone())
.await;
store_assert_is_empty(server.store(), server.core.storage.blob.clone()).await;
search_store_destroy(server.search_store()).await;
// Clean caches
for cache in [
@@ -321,6 +322,7 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
let data = Data::parse(&mut config);
let cache = Caches::parse(&mut config);
let store = core.storage.data.clone();
let search_store = core.storage.fts.clone();
let (ipc, mut ipc_rxs) = build_ipc(false);
let inner = Arc::new(Inner {
shared_core: core.into_shared(),
@@ -330,7 +332,8 @@ async fn init_jmap_tests(delete_if_exists: bool) -> JMAPTest {
});
if delete_if_exists {
store.destroy().await;
store_destroy(&store).await;
search_store_destroy(&search_store).await;
}
// Parse acceptors

View File

@@ -17,7 +17,11 @@ use email::{
message::delete::EmailDeletion,
};
use imap_proto::ResponseType;
use store::{IterateParams, LogKey, U32_LEN, U64_LEN, write::key::DeserializeBigEndian};
use store::{
IterateParams, LogKey, U32_LEN, U64_LEN,
search::SearchQuery,
write::{SearchIndex, key::DeserializeBigEndian},
};
use types::id::Id;
pub async fn test(params: &mut JMAPTest) {
@@ -155,6 +159,19 @@ pub async fn test(params: &mut JMAPTest) {
.delete_principal(QueryBy::Id(account.id().document_id()))
.await
.unwrap();
for index in [
SearchIndex::Email,
SearchIndex::Contacts,
SearchIndex::Calendar,
] {
server
.core
.storage
.fts
.unindex(SearchQuery::new(index).with_account_id(account.id().document_id()))
.await
.unwrap();
}
params.assert_is_empty().await;
}

View File

@@ -15,6 +15,7 @@ use crate::{
inbound::TestMessage,
session::{TestSession, VerifyResponse, load_test_message},
},
store::cleanup::store_assert_is_empty,
};
use smtp::core::Session;
@@ -233,8 +234,5 @@ async fn data() {
// Make sure store is empty
qr.clear_queue(&test.server).await;
test.server
.store()
.assert_is_empty(test.server.blob_store().clone())
.await;
store_assert_is_empty(test.server.store(), test.server.blob_store().clone()).await;
}

View File

@@ -29,7 +29,7 @@ use store::{BlobStore, Store, Stores};
use tokio::sync::{mpsc, watch};
use utils::config::Config;
use crate::AssertConfig;
use crate::{AssertConfig, store::cleanup::store_destroy};
pub mod config;
pub mod inbound;
@@ -236,7 +236,7 @@ impl TestSMTP {
let stores = Stores::parse_all(&mut config, false).await;
let core = Core::parse(&mut config, stores, Default::default()).await;
let data = Data::parse(&mut config);
core.storage.data.destroy().await;
store_destroy(&core.storage.data).await;
Self::from_core_and_tempdir(core, data, Some(temp_dir))
}

View File

@@ -9,7 +9,10 @@ use std::time::{Duration, Instant};
use common::{config::server::ServerProtocol, core::BuildServer, ipc::QueueEvent};
use mail_auth::MX;
use crate::smtp::{DnsCache, TestSMTP, session::TestSession};
use crate::{
smtp::{DnsCache, TestSMTP, session::TestSession},
store::cleanup::store_assert_is_empty,
};
use smtp::queue::manager::Queue;
const LOCAL: &str = r#"
@@ -149,9 +152,5 @@ async fn concurrent_queue() {
assert_eq!(remote_messages.len(), NUM_MESSAGES);
// Make sure local store is queue
core.core
.storage
.data
.assert_is_empty(core.core.storage.blob.clone())
.await;
store_assert_is_empty(&core.core.storage.data, core.core.storage.blob.clone()).await;
}

View File

@@ -13,7 +13,10 @@ use common::{
};
use mail_auth::MX;
use crate::smtp::{DnsCache, TestSMTP, session::TestSession};
use crate::{
smtp::{DnsCache, TestSMTP, session::TestSession},
store::cleanup::store_assert_is_empty,
};
use smtp::queue::manager::Queue;
const LOCAL: &str = r#"
@@ -205,9 +208,5 @@ async fn virtual_queue() {
assert_eq!(remote_messages.len(), NUM_MESSAGES * 2);
// Make sure local store is queue
core.core
.storage
.data
.assert_is_empty(core.core.storage.blob.clone())
.await;
store_assert_is_empty(&core.core.storage.data, core.core.storage.blob.clone()).await;
}

View File

@@ -12,7 +12,7 @@ use store::{
use types::{blob::BlobClass, blob_hash::BlobHash, collection::Collection};
use utils::config::Config;
use crate::store::{CONFIG, TempDir};
use crate::store::{CONFIG, TempDir, cleanup::store_destroy};
#[tokio::test]
pub async fn blob_tests() {
@@ -30,7 +30,7 @@ pub async fn blob_tests() {
println!("Testing blob management on store {}...", store_id);
// Init store
store.destroy().await;
store_destroy(&store).await;
// Test internal blob store
let blob_store: BlobStore = store.clone().into();

342
tests/src/store/cleanup.rs Normal file
View File

@@ -0,0 +1,342 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use store::{
ValueKey,
write::{key::DeserializeBigEndian, *},
*,
};
use trc::AddContext;
pub async fn store_destroy(store: &Store) {
store_destroy_sql_indexes(store).await;
for subspace in [
SUBSPACE_ACL,
SUBSPACE_DIRECTORY,
SUBSPACE_TASK_QUEUE,
SUBSPACE_INDEXES,
SUBSPACE_BLOB_RESERVE,
SUBSPACE_BLOB_LINK,
SUBSPACE_LOGS,
SUBSPACE_IN_MEMORY_COUNTER,
SUBSPACE_IN_MEMORY_VALUE,
SUBSPACE_COUNTER,
SUBSPACE_PROPERTY,
SUBSPACE_SETTINGS,
SUBSPACE_BLOBS,
SUBSPACE_QUEUE_MESSAGE,
SUBSPACE_QUEUE_EVENT,
SUBSPACE_QUOTA,
SUBSPACE_REPORT_OUT,
SUBSPACE_REPORT_IN,
SUBSPACE_TELEMETRY_SPAN,
SUBSPACE_TELEMETRY_METRIC,
SUBSPACE_SEARCH_INDEX,
] {
if subspace == SUBSPACE_SEARCH_INDEX && store.is_pg_or_mysql() {
continue;
}
store
.delete_range(
AnyKey {
subspace,
key: vec![0u8],
},
AnyKey {
subspace,
key: vec![u8::MAX; 16],
},
)
.await
.unwrap();
}
}
pub async fn search_store_destroy(store: &SearchStore) {
match &store {
SearchStore::Store(store) => {
store_destroy_sql_indexes(store).await;
}
SearchStore::ElasticSearch(store) => {
if let Err(err) = store.drop_indexes().await {
eprintln!("Failed to drop elasticsearch indexes: {}", err);
}
store.create_indexes(3, 0, false).await.unwrap();
}
}
}
#[allow(unused_variables)]
async fn store_destroy_sql_indexes(store: &Store) {
#[cfg(any(feature = "postgres", feature = "mysql"))]
{
if store.is_pg_or_mysql() {
for index in [
SearchIndex::Email,
SearchIndex::Calendar,
SearchIndex::Contacts,
SearchIndex::Tracing,
] {
#[cfg(feature = "postgres")]
let table = index.psql_table();
#[cfg(feature = "mysql")]
let table = index.mysql_table();
store
.sql_query::<usize>(&format!("TRUNCATE TABLE {table}"), vec![])
.await
.unwrap();
}
}
}
}
pub async fn store_blob_expire_all(store: &Store) {
// Delete all temporary hashes
let from_key = ValueKey {
account_id: 0,
collection: 0,
document_id: 0,
class: ValueClass::Blob(BlobOp::Reserve {
hash: types::blob_hash::BlobHash::default(),
until: 0,
}),
};
let to_key = ValueKey {
account_id: u32::MAX,
collection: 0,
document_id: 0,
class: ValueClass::Blob(BlobOp::Reserve {
hash: types::blob_hash::BlobHash::default(),
until: 0,
}),
};
let mut batch = BatchBuilder::new();
let mut last_account_id = u32::MAX;
store
.iterate(
IterateParams::new(from_key, to_key).ascending().no_values(),
|key, _| {
let account_id = key.deserialize_be_u32(0).caused_by(trc::location!())?;
if account_id != last_account_id {
last_account_id = account_id;
batch.with_account_id(account_id);
}
batch.any_op(Operation::Value {
class: ValueClass::Blob(BlobOp::Reserve {
hash: types::blob_hash::BlobHash::try_from_hash_slice(
key.get(U32_LEN..U32_LEN + types::blob_hash::BLOB_HASH_LEN)
.unwrap(),
)
.unwrap(),
until: key
.deserialize_be_u64(key.len() - U64_LEN)
.caused_by(trc::location!())?,
}),
op: ValueOp::Clear,
});
Ok(true)
},
)
.await
.unwrap();
store.write(batch.build_all()).await.unwrap();
}
pub async fn store_lookup_expire_all(store: &Store) {
// Delete all temporary counters
let from_key = ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![0u8])));
let to_key = ValueKey::from(ValueClass::InMemory(InMemoryClass::Key(vec![u8::MAX; 10])));
let mut expired_keys = Vec::new();
let mut expired_counters = Vec::new();
store
.iterate(IterateParams::new(from_key, to_key), |key, value| {
let expiry = value.deserialize_be_u64(0).caused_by(trc::location!())?;
if expiry == 0 {
expired_counters.push(key.to_vec());
} else if expiry != u64::MAX {
expired_keys.push(key.to_vec());
}
Ok(true)
})
.await
.unwrap();
if !expired_keys.is_empty() {
let mut batch = BatchBuilder::new();
for key in expired_keys {
batch.any_op(Operation::Value {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.is_large_batch() {
store.write(batch.build_all()).await.unwrap();
batch = BatchBuilder::new();
}
}
if !batch.is_empty() {
store.write(batch.build_all()).await.unwrap();
}
}
if !expired_counters.is_empty() {
let mut batch = BatchBuilder::new();
for key in expired_counters {
batch.any_op(Operation::Value {
class: ValueClass::InMemory(InMemoryClass::Counter(key.clone())),
op: ValueOp::Clear,
});
batch.any_op(Operation::Value {
class: ValueClass::InMemory(InMemoryClass::Key(key)),
op: ValueOp::Clear,
});
if batch.is_large_batch() {
store.write(batch.build_all()).await.unwrap();
batch = BatchBuilder::new();
}
}
if !batch.is_empty() {
store.write(batch.build_all()).await.unwrap();
}
}
}
#[allow(unused_variables)]
pub async fn store_assert_is_empty(store: &Store, blob_store: BlobStore) {
store_blob_expire_all(store).await;
store_lookup_expire_all(store).await;
store.purge_blobs(blob_store).await.unwrap();
store.purge_store().await.unwrap();
let store = store.clone();
let mut failed = false;
for (subspace, with_values) in [
(SUBSPACE_ACL, true),
//(SUBSPACE_DIRECTORY, true),
(SUBSPACE_TASK_QUEUE, true),
(SUBSPACE_IN_MEMORY_VALUE, true),
(SUBSPACE_IN_MEMORY_COUNTER, false),
(SUBSPACE_PROPERTY, true),
(SUBSPACE_SETTINGS, true),
(SUBSPACE_QUEUE_MESSAGE, true),
(SUBSPACE_QUEUE_EVENT, true),
(SUBSPACE_REPORT_OUT, true),
(SUBSPACE_REPORT_IN, true),
(SUBSPACE_BLOB_RESERVE, true),
(SUBSPACE_BLOB_LINK, true),
(SUBSPACE_BLOBS, true),
(SUBSPACE_COUNTER, false),
(SUBSPACE_QUOTA, false),
(SUBSPACE_BLOBS, true),
(SUBSPACE_INDEXES, false),
(SUBSPACE_TELEMETRY_SPAN, true),
(SUBSPACE_TELEMETRY_METRIC, true),
(SUBSPACE_SEARCH_INDEX, true),
] {
if subspace == SUBSPACE_SEARCH_INDEX && store.is_pg_or_mysql() {
continue;
}
let from_key = AnyKey {
subspace,
key: vec![0u8],
};
let to_key = AnyKey {
subspace,
key: vec![u8::MAX; 10],
};
store
.iterate(
IterateParams::new(from_key, to_key).set_values(with_values),
|key, value| {
match subspace {
SUBSPACE_COUNTER if key.len() == U32_LEN + 1 || key.len() == U32_LEN => {
// Message ID and change ID counters
return Ok(true);
}
SUBSPACE_INDEXES => {
println!(
concat!(
"Found index key, account {}, collection {}, ",
"document {}, property {}, value {:?}: {:?}"
),
u32::from_be_bytes(key[0..4].try_into().unwrap()),
key[4],
u32::from_be_bytes(key[key.len() - 4..].try_into().unwrap()),
key[5],
String::from_utf8_lossy(&key[6..key.len() - 4]),
key
);
}
_ => {
println!(
"Found key in {:?}: {:?} ({:?}) = {:?} ({:?})",
char::from(subspace),
key,
String::from_utf8_lossy(key),
value,
String::from_utf8_lossy(value)
);
}
}
failed = true;
Ok(true)
},
)
.await
.unwrap();
}
// Delete logs and counters
store
.delete_range(
AnyKey {
subspace: SUBSPACE_LOGS,
key: &[0u8],
},
AnyKey {
subspace: SUBSPACE_LOGS,
key: &[
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
u8::MAX,
],
},
)
.await
.unwrap();
store
.delete_range(
AnyKey {
subspace: SUBSPACE_COUNTER,
key: &[0u8],
},
AnyKey {
subspace: SUBSPACE_COUNTER,
key: (u32::MAX / 2).to_be_bytes().as_slice(),
},
)
.await
.unwrap();
if failed {
panic!("Store is not empty.");
}
}

View File

@@ -11,7 +11,10 @@ use utils::config::{Config, Rate};
use crate::{
AssertConfig,
store::{CONFIG, TempDir},
store::{
CONFIG, TempDir,
cleanup::{store_assert_is_empty, store_destroy},
},
};
#[tokio::test]
@@ -30,7 +33,7 @@ pub async fn lookup_tests() {
for (store_id, store) in stores.in_memory_stores {
println!("Testing in-memory store {}...", store_id);
if let InMemoryStore::Store(store) = &store {
store.destroy().await;
store_destroy(store).await;
} else {
// Reset redis counter
store
@@ -65,7 +68,7 @@ pub async fn lookup_tests() {
store.purge_in_memory_store().await.unwrap();
if let InMemoryStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
store_assert_is_empty(store, store.clone().into()).await;
}
// Test counter
@@ -123,7 +126,7 @@ pub async fn lookup_tests() {
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
store.purge_in_memory_store().await.unwrap();
if let InMemoryStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
store_assert_is_empty(store, store.clone().into()).await;
}
// Test locking
@@ -149,7 +152,7 @@ pub async fn lookup_tests() {
}
store.purge_in_memory_store().await.unwrap();
if let InMemoryStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
store_assert_is_empty(store, store.clone().into()).await;
}
// Test prefix delete
@@ -281,7 +284,7 @@ pub async fn lookup_tests() {
);
if let InMemoryStore::Store(store) = &store {
store.assert_is_empty(store.clone().into()).await;
store_assert_is_empty(store, store.clone().into()).await;
}
}
}

View File

@@ -6,13 +6,17 @@
pub mod blob;
//pub mod import_export;
pub mod cleanup;
pub mod lookup;
pub mod ops;
pub mod query;
use crate::AssertConfig;
use crate::{
AssertConfig,
store::cleanup::{search_store_destroy, store_destroy},
};
use std::io::Read;
use store::{SearchStore, Stores};
use store::Stores;
use utils::config::Config;
pub struct TempDir {
@@ -38,7 +42,7 @@ pub async fn store_tests() {
println!("Testing store {}...", store_id);
if insert {
store.destroy().await;
store_destroy(&store).await;
}
//import_export::test(store.clone()).await;
@@ -68,10 +72,7 @@ pub async fn search_tests() {
println!("Testing store {}...", store_id);
if insert {
match &store {
SearchStore::Store(store) => store.destroy().await,
SearchStore::ElasticSearch(_) => (),
}
search_store_destroy(&store).await;
}
query::test(store, insert).await;
@@ -179,10 +180,10 @@ type = "redis"
urls = "redis://127.0.0.1"
redis-type = "single"
[store."psql-replica"]
type = "sql-read-replica"
primary = "postgresql"
replicas = "postgresql"
#[store."psql-replica"]
#type = "sql-read-replica"
#primary = "postgresql"
#replicas = "postgresql"
[storage]
data = "{STORE}"

View File

@@ -16,6 +16,8 @@ use store::{
};
use types::collection::{Collection, SyncCollection};
use crate::store::cleanup::store_assert_is_empty;
// FDB max value
const MAX_VALUE_SIZE: usize = 100000;
@@ -471,6 +473,6 @@ pub async fn test(db: Store) {
db.write(batch.build_all()).await.unwrap();
// Make sure everything is deleted
db.assert_is_empty(db.clone().into()).await;
store_assert_is_empty(&db, db.clone().into()).await;
}
}

View File

@@ -300,11 +300,7 @@ pub async fn test(store: SearchStore, do_insert: bool) {
}
async fn test_filter(store: SearchStore, fields: &AHashMap<u32, String>, mask: &RoaringBitmap) {
#[cfg(feature = "mysql")]
let can_stem = !matches!(store, SearchStore::Store(store::Store::MySQL(_)));
#[cfg(not(feature = "mysql"))]
let can_stem = true;
let can_stem = !store.is_mysql();
let tests = [
(
@@ -482,11 +478,7 @@ async fn test_filter(store: SearchStore, fields: &AHashMap<u32, String>, mask: &
}
async fn test_sort(store: SearchStore, fields: &AHashMap<u32, String>, mask: &RoaringBitmap) {
#[cfg(feature = "postgres")]
let is_reversed = matches!(store, SearchStore::Store(store::Store::PostgreSQL(_)));
#[cfg(not(feature = "postgres"))]
let is_reversed = false;
let is_reversed = store.is_postgres();
let tests = [
(

View File

@@ -259,6 +259,8 @@ pub async fn test(test: &WebDavTest) {
);
// Check that John received the RSVP
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
test.wait_for_index().await;
let itips = fetch_and_remove_itips(john_client).await;
assert_eq!(itips.len(), 1);
assert!(
@@ -448,6 +450,8 @@ pub async fn test(test: &WebDavTest) {
let main_event_href = cal.href;
// Check that Bill received the update
tokio::time::sleep(std::time::Duration::from_millis(200)).await;
test.wait_for_index().await;
let mut itips = fetch_and_remove_itips(bill_client).await;
itips.sort_unstable_by(|a, _| {
if a.contains("Lunch") {

View File

@@ -7,8 +7,11 @@
use crate::{
AssertConfig, TEST_USERS, add_test_certs,
directory::internal::TestInternalDirectory,
jmap::assert_is_empty,
store::{TempDir, build_store_config},
jmap::{assert_is_empty, wait_for_index},
store::{
TempDir, build_store_config,
cleanup::{search_store_destroy, store_destroy},
},
};
use ::managesieve::core::ManageSieveSessionManager;
use ::store::Stores;
@@ -150,6 +153,7 @@ async fn init_webdav_tests(assisted_discovery: bool, delete_if_exists: bool) ->
let cache = Caches::parse(&mut config);
let store = core.storage.data.clone();
let search_store = core.storage.fts.clone();
let (ipc, mut ipc_rxs) = build_ipc(false);
let inner = Arc::new(Inner {
shared_core: core.into_shared(),
@@ -206,7 +210,8 @@ async fn init_webdav_tests(assisted_discovery: bool, delete_if_exists: bool) ->
});
if delete_if_exists {
store.destroy().await;
store_destroy(&store).await;
search_store_destroy(&search_store).await;
}
// Create test accounts
@@ -270,6 +275,10 @@ impl WebDavTest {
assert_is_empty(&self.server).await;
self.clear_cache();
}
pub async fn wait_for_index(&self) {
wait_for_index(&self.server).await;
}
}
#[allow(dead_code)]