Meilisearch FTS backend implementation (closes #1482)

This commit is contained in:
mdecimus
2025-12-14 15:21:27 +01:00
parent 8e85a616ff
commit 89f0229191
18 changed files with 1046 additions and 136 deletions

2
Cargo.lock generated
View File

@@ -7730,7 +7730,7 @@ dependencies = [
[[package]]
name = "tests"
version = "0.14.1"
version = "0.15.0"
dependencies = [
"ahash",
"async-trait",

View File

@@ -32,8 +32,6 @@ use std::{
hash::{Hash, RandomState},
sync::Arc,
};
use store::rand::SeedableRng;
use store::rand::rngs::StdRng;
use store::rand::seq::SliceRandom;
use store::write::{BlobLink, now};
use store::{
@@ -303,13 +301,18 @@ impl SpamClassifier for Server {
}
let num_samples = samples.len();
samples.shuffle(&mut StdRng::seed_from_u64(42));
samples.shuffle(&mut store::rand::rng());
// Spawn training task
let epochs = match trainer.reservoir.ham.total_seen + trainer.reservoir.spam.total_seen {
0..=2500 => 3, // Bootstrap
2_501..=10_000 => 2, // Refinement
_ => 1, // Full online training
let epochs = match trainer
.reservoir
.ham
.total_seen
.min(trainer.reservoir.spam.total_seen)
{
0..=50 => 3, // Bootstrap
51..=200 => 2, // Refinement
_ => 1, // Full online training
};
let task = trainer.trainer.spawn(epochs)?;
let is_fh = matches!(task, TrainTask::Fh { .. });

View File

@@ -0,0 +1,267 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use crate::{
backend::meili::{MeiliSearchStore, Task, TaskStatus, TaskUid},
search::{
CalendarSearchField, ContactSearchField, EmailSearchField, SearchableField,
TracingSearchField,
},
write::SearchIndex,
};
use reqwest::{Error, Response, Url};
use serde_json::{Value, json};
use std::time::Duration;
use utils::config::{Config, http::build_http_client, utils::AsKey};
impl MeiliSearchStore {
pub async fn open(config: &mut Config, prefix: impl AsKey) -> Option<Self> {
let client = build_http_client(config, prefix.clone(), "application/json".into())?;
let prefix = prefix.as_key();
let url = config
.value_require((&prefix, "url"))?
.trim_end_matches("/")
.to_string();
Url::parse(&url)
.map_err(|e| config.new_parse_error((&prefix, "url"), format!("Invalid URL: {e}",)))
.ok()?;
let task_poll_interval = config
.property_or_default::<Duration>((&prefix, "task.poll-interval"), "500ms")
.unwrap_or(Duration::from_millis(500));
let task_poll_retries = config
.property_or_default::<usize>((&prefix, "task.poll-retries"), "60")
.unwrap_or(60);
let ms = Self {
client,
url,
task_poll_interval,
task_poll_retries,
};
if let Err(err) = ms.create_indexes().await {
config.new_build_error(prefix.as_str(), err.to_string());
}
Some(ms)
}
pub async fn create_indexes(&self) -> trc::Result<()> {
self.create_index::<EmailSearchField>().await?;
self.create_index::<CalendarSearchField>().await?;
self.create_index::<ContactSearchField>().await?;
self.create_index::<TracingSearchField>().await?;
Ok(())
}
async fn create_index<T: SearchableField>(&self) -> trc::Result<()> {
let index_name = T::index().index_name();
let response = self
.client
.post(format!("{}/indexes", self.url))
.body(
json!({
"uid": index_name,
"primaryKey": "id",
})
.to_string(),
)
.send()
.await
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err))?;
if !self.wait_for_task(response).await? {
// Index already exists
return Ok(());
}
let mut searchable = Vec::new();
let mut filterable = Vec::new();
let mut sortable = Vec::new();
for field in T::all_fields() {
if field.is_indexed() {
sortable.push(Value::String(field.field_name().to_string()));
}
if field.is_text() {
searchable.push(Value::String(field.field_name().to_string()));
} else {
filterable.push(Value::String(field.field_name().to_string()));
}
}
for key in T::primary_keys() {
filterable.push(Value::String(key.field_name().to_string()));
}
#[cfg(feature = "test_mode")]
filterable.push(Value::String("bcc".into()));
if !searchable.is_empty() {
self.update_index_settings(
index_name,
"searchable-attributes",
Value::Array(searchable),
)
.await?;
}
if !filterable.is_empty() {
self.update_index_settings(
index_name,
"filterable-attributes",
Value::Array(filterable),
)
.await?;
}
if !sortable.is_empty() {
self.update_index_settings(index_name, "sortable-attributes", Value::Array(sortable))
.await?;
}
Ok(())
}
async fn update_index_settings(
&self,
index_uid: &str,
setting: &str,
value: Value,
) -> trc::Result<bool> {
let response = assert_success(
self.client
.put(format!(
"{}/indexes/{}/settings/{}",
self.url, index_uid, setting
))
.body(value.to_string())
.send()
.await,
)
.await?;
self.wait_for_task(response).await
}
#[cfg(feature = "test_mode")]
pub async fn drop_indexes(&self) -> trc::Result<()> {
for index in &[
SearchIndex::Email,
SearchIndex::Calendar,
SearchIndex::Contacts,
SearchIndex::Tracing,
] {
let response = self
.client
.delete(format!("{}/indexes/{}", self.url, index.index_name()))
.send()
.await
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err))?;
match response.status().as_u16() {
200..=299 => {
self.wait_for_task(response).await?;
}
400..=499 => {
// Index does not exist
return Ok(());
}
_ => {
let status = response.status();
let msg = response.text().await.unwrap_or_default();
return Err(trc::StoreEvent::MeilisearchError
.reason(msg)
.ctx(trc::Key::Code, status.as_u16()));
}
}
}
Ok(())
}
pub(crate) async fn wait_for_task(&self, response: Response) -> trc::Result<bool> {
let response_body = response.text().await.map_err(|err| {
trc::StoreEvent::MeilisearchError
.reason(err)
.details("Request failed")
})?;
let task_uid = serde_json::from_str::<TaskUid>(&response_body)
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err))?
.task_uid;
let mut loop_count = 0;
let url = format!("{}/tasks/{}", self.url, task_uid);
while loop_count < self.task_poll_retries {
let resp = assert_success(self.client.get(&url).send().await).await?;
let text = resp
.text()
.await
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err))?;
let task = serde_json::from_str::<Task>(&text).map_err(|err| {
trc::StoreEvent::MeilisearchError
.reason(err)
.details(text.clone())
})?;
match task.status {
TaskStatus::Succeeded => return Ok(true),
TaskStatus::Failed => {
let (code, message) = task
.error
.map(|e| (e.code, Some(e.message)))
.unwrap_or((None, None));
return if matches!(code.as_deref(), Some("index_already_exists")) {
Ok(false)
} else {
Err(trc::StoreEvent::MeilisearchError
.reason("Meilisearch task failed.")
.id(task_uid)
.code(code)
.details(message))
};
}
TaskStatus::Canceled => {
return Err(trc::StoreEvent::MeilisearchError
.reason("Meilisearch task was canceled")
.id(task_uid));
}
TaskStatus::Enqueued | TaskStatus::Processing => {
loop_count += 1;
tokio::time::sleep(self.task_poll_interval).await;
}
TaskStatus::Unknown => {
return Err(trc::StoreEvent::MeilisearchError
.reason("Meilisearch task returned an unknown status")
.id(task_uid)
.details(text));
}
}
}
Err(trc::StoreEvent::MeilisearchError
.reason("Timed out waiting for Meilisearch task")
.id(task_uid))
}
}
pub(crate) async fn assert_success(response: Result<Response, Error>) -> trc::Result<Response> {
match response {
Ok(response) => {
let status = response.status();
if status.is_success() {
Ok(response)
} else {
Err(trc::StoreEvent::MeilisearchError
.reason(response.text().await.unwrap_or_default())
.ctx(trc::Key::Code, status.as_u16()))
}
}
Err(err) => Err(trc::StoreEvent::MeilisearchError.reason(err)),
}
}

View File

@@ -0,0 +1,66 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use reqwest::Client;
use serde::Deserialize;
use std::time::Duration;
pub mod main;
pub mod search;
pub struct MeiliSearchStore {
client: Client,
url: String,
task_poll_interval: Duration,
task_poll_retries: usize,
}
#[derive(Debug, Deserialize)]
pub(crate) struct TaskUid {
#[serde(rename = "taskUid")]
pub task_uid: u64,
}
#[derive(Debug, Deserialize)]
struct TaskError {
message: String,
#[serde(default)]
code: Option<String>,
}
#[derive(Debug, Deserialize)]
struct Task {
//#[serde(rename = "uid")]
//uid: u64,
status: TaskStatus,
#[serde(default)]
error: Option<TaskError>,
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "lowercase")]
enum TaskStatus {
Enqueued,
Processing,
Succeeded,
Failed,
Canceled,
#[serde(other)]
Unknown,
}
#[derive(Debug, Deserialize)]
struct MeiliSearchResponse {
hits: Vec<MeiliHit>,
//#[allow(dead_code)]
//#[serde(default, rename = "estimatedTotalHits")]
//estimated_total_hits: Option<u64>,
}
#[derive(Debug, Deserialize)]
struct MeiliHit {
id: u64,
}

View File

@@ -0,0 +1,445 @@
/*
* SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
*
* SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-SEL
*/
use ahash::AHashSet;
use serde_json::{Map, Value, json};
use crate::{
backend::meili::{MeiliSearchResponse, MeiliSearchStore, main::assert_success},
search::*,
write::SearchIndex,
};
use std::fmt::{Display, Write};
impl MeiliSearchStore {
pub async fn index(&self, documents: Vec<IndexDocument>) -> trc::Result<()> {
let mut index_documents: [String; 5] = [
String::new(),
String::new(),
String::new(),
String::new(),
String::new(),
];
for document in documents {
let request = &mut index_documents[document.index.array_pos()];
if !request.is_empty() {
request.push(',');
} else {
request.reserve(1024);
request.push('[');
}
json_serialize(request, &document);
}
for (mut payload, index) in index_documents.into_iter().zip([
SearchIndex::Email,
SearchIndex::Calendar,
SearchIndex::Contacts,
SearchIndex::Tracing,
SearchIndex::File,
]) {
if payload.is_empty() {
continue;
}
payload.push(']');
let response = assert_success(
self.client
.put(format!(
"{}/indexes/{}/documents",
self.url,
index.index_name()
))
.body(payload)
.send()
.await,
)
.await?;
self.wait_for_task(response).await?;
}
Ok(())
}
pub async fn query<R: SearchDocumentId>(
&self,
index: SearchIndex,
filters: &[SearchFilter],
sort: &[SearchComparator],
) -> trc::Result<Vec<R>> {
let filter_group = build_query(filters);
let mut body = Map::new();
body.insert("limit".to_string(), Value::from(10_000));
body.insert("offset".to_string(), Value::from(0));
body.insert(
"attributesToRetrieve".to_string(),
Value::Array(vec![Value::String("id".to_string())]),
);
if !filter_group.filter.is_empty() {
body.insert("filter".to_string(), Value::String(filter_group.filter));
}
if !filter_group.q.is_empty() {
body.insert("q".to_string(), Value::String(filter_group.q));
}
if !sort.is_empty() {
let sort_arr: Vec<Value> = sort
.iter()
.filter_map(|comp| match comp {
SearchComparator::Field { field, ascending } => Some(Value::String(format!(
"{}:{}",
field.field_name(),
if *ascending { "asc" } else { "desc" }
))),
_ => None,
})
.collect();
if !sort_arr.is_empty() {
body.insert("sort".to_string(), Value::Array(sort_arr));
}
}
let resp = assert_success(
self.client
.post(format!(
"{}/indexes/{}/search",
self.url,
index.index_name()
))
.body(Value::Object(body).to_string())
.send()
.await,
)
.await?;
let text = resp
.text()
.await
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err))?;
serde_json::from_str::<MeiliSearchResponse>(&text)
.map(|results| {
results
.hits
.into_iter()
.map(|hit| R::from_u64(hit.id))
.collect()
})
.map_err(|err| trc::StoreEvent::MeilisearchError.reason(err).details(text))
}
pub async fn unindex(&self, filter: SearchQuery) -> trc::Result<u64> {
let filter_group = build_query(&filter.filters);
if filter_group.filter.is_empty() {
return Err(trc::StoreEvent::MeilisearchError.reason(
"Meilisearch delete-by-filter requires structured (non-text) filters only",
));
}
let url = format!(
"{}/indexes/{}/documents/delete",
self.url,
filter.index.index_name()
);
let response = assert_success(
self.client
.post(url)
.body(json!({ "filter": filter_group.filter }).to_string())
.send()
.await,
)
.await?;
self.wait_for_task(response).await?;
Ok(0)
}
}
#[derive(Default, Debug)]
struct FilterGroup {
q: String,
filter: String,
}
fn build_query(filters: &[SearchFilter]) -> FilterGroup {
if filters.is_empty() {
return FilterGroup::default();
}
let mut operator_stack = Vec::new();
let mut operator = &SearchFilter::And;
let mut is_first = true;
let mut filter = String::new();
let mut queries = AHashSet::new();
for f in filters {
match f {
SearchFilter::Operator { field, op, value } => {
if field.is_text() && matches!(op, SearchOperator::Equal | SearchOperator::Contains)
{
let value = match value {
SearchValue::Text { value, .. } => value,
_ => {
debug_assert!(
false,
"Text field search with non-text value is not supported"
);
""
}
};
if matches!(op, SearchOperator::Equal) {
queries.insert(format!("{value:?}"));
} else {
for token in value.split_whitespace() {
queries.insert(token.to_string());
}
}
} else {
if !filter.is_empty() && !filter.ends_with('(') {
match operator {
SearchFilter::And => filter.push_str(" AND "),
SearchFilter::Or => filter.push_str(" OR "),
_ => (),
}
}
match value {
SearchValue::Text { value, .. } => {
filter.push_str(field.field_name());
filter.push(' ');
op.write_meli_op(&mut filter, format!("{value:?}"));
}
SearchValue::KeyValues(kv) => {
let (key, value) = kv.iter().next().unwrap();
filter.push_str(field.field_name());
filter.push('.');
filter.push_str(key);
filter.push(' ');
op.write_meli_op(&mut filter, format!("{value:?}"));
}
SearchValue::Int(v) => {
filter.push_str(field.field_name());
filter.push(' ');
op.write_meli_op(&mut filter, v);
}
SearchValue::Uint(v) => {
filter.push_str(field.field_name());
filter.push(' ');
op.write_meli_op(&mut filter, v);
}
SearchValue::Boolean(v) => {
filter.push_str(field.field_name());
filter.push(' ');
op.write_meli_op(&mut filter, v);
}
}
}
}
SearchFilter::And | SearchFilter::Or => {
if !filter.is_empty() && !filter.ends_with('(') {
match operator {
SearchFilter::And => filter.push_str(" AND "),
SearchFilter::Or => filter.push_str(" OR "),
_ => (),
}
}
operator_stack.push((operator, is_first));
operator = f;
is_first = true;
filter.push('(');
}
SearchFilter::Not => {
if !filter.is_empty() && !filter.ends_with('(') {
match operator {
SearchFilter::And => filter.push_str(" AND "),
SearchFilter::Or => filter.push_str(" OR "),
_ => (),
}
}
operator_stack.push((operator, is_first));
operator = &SearchFilter::And;
is_first = true;
filter.push_str("NOT (");
}
SearchFilter::End => {
let p = operator_stack.pop().unwrap_or((&SearchFilter::And, true));
operator = p.0;
is_first = p.1;
if !filter.ends_with('(') {
filter.push(')');
} else {
filter.pop();
if filter.ends_with("NOT ") {
let len = filter.len();
filter.truncate(len - 4);
}
if filter.ends_with(" AND ") {
let len = filter.len();
filter.truncate(len - 5);
is_first = true;
} else if filter.ends_with(" OR ") {
let len = filter.len();
filter.truncate(len - 4);
is_first = true;
}
}
}
SearchFilter::DocumentSet(_) => {
debug_assert!(false, "DocumentSet filters are not supported")
}
}
}
let mut q = String::new();
if !queries.is_empty() {
for (idx, term) in queries.into_iter().enumerate() {
if idx > 0 {
q.push(' ');
}
q.push_str(&term);
}
}
FilterGroup { q, filter }
}
impl SearchOperator {
fn write_meli_op(&self, query: &mut String, value: impl Display) {
match self {
SearchOperator::LowerThan => {
let _ = write!(query, "< {value}");
}
SearchOperator::LowerEqualThan => {
let _ = write!(query, "<= {value}");
}
SearchOperator::GreaterThan => {
let _ = write!(query, "> {value}");
}
SearchOperator::GreaterEqualThan => {
let _ = write!(query, ">= {value}");
}
SearchOperator::Equal | SearchOperator::Contains => {
let _ = write!(query, "= {value}");
}
}
}
}
fn json_serialize(request: &mut String, document: &IndexDocument) {
let mut id = 0u64;
let mut is_first = true;
request.push('{');
for (k, v) in document.fields.iter() {
match k {
SearchField::AccountId => {
if let SearchValue::Uint(account_id) = v {
id |= account_id << 32;
}
}
SearchField::DocumentId => {
if let SearchValue::Uint(doc_id) = v {
id |= doc_id;
}
}
SearchField::Id => {
if let SearchValue::Uint(doc_id) = v {
id = *doc_id;
}
continue;
}
_ => {}
}
if !is_first {
request.push(',');
} else {
is_first = false;
}
let _ = write!(request, "{:?}:", k.field_name());
match v {
SearchValue::Text { value, .. } => {
json_serialize_str(request, value);
}
SearchValue::KeyValues(map) => {
request.push('{');
for (i, (key, value)) in map.iter().enumerate() {
if i > 0 {
request.push(',');
}
json_serialize_str(request, key);
request.push(':');
json_serialize_str(request, value);
}
request.push('}');
}
SearchValue::Int(v) => {
let _ = write!(request, "{}", v);
}
SearchValue::Uint(v) => {
let _ = write!(request, "{}", v);
}
SearchValue::Boolean(v) => {
let _ = write!(request, "{}", v);
}
}
}
/*if id == 0 {
debug_assert!(false, "Document is missing required ID fields");
}*/
let _ = write!(request, ",\"id\":{id}}}");
}
fn json_serialize_str(request: &mut String, value: &str) {
request.push('"');
for c in value.chars() {
match c {
'"' => request.push_str("\\\""),
'\\' => request.push_str("\\\\"),
'\n' => request.push_str("\\n"),
'\r' => request.push_str("\\r"),
'\t' => request.push_str("\\t"),
'\u{0008}' => request.push_str("\\b"), // backspace
'\u{000C}' => request.push_str("\\f"), // form feed
_ => {
if !c.is_control() {
request.push(c);
} else {
let _ = write!(request, "\\u{:04x}", c as u32);
}
}
}
}
request.push('"');
}
impl SearchIndex {
#[inline(always)]
fn array_pos(&self) -> usize {
match self {
SearchIndex::Email => 0,
SearchIndex::Calendar => 1,
SearchIndex::Contacts => 2,
SearchIndex::Tracing => 3,
SearchIndex::File => 4,
SearchIndex::InMemory => unreachable!(),
}
}
}

View File

@@ -13,6 +13,7 @@ pub mod fs;
pub mod http;
#[cfg(feature = "kafka")]
pub mod kafka;
pub mod meili;
pub mod memory;
#[cfg(feature = "mysql")]
pub mod mysql;

View File

@@ -6,7 +6,7 @@
use crate::{
BlobStore, CompressionAlgo, InMemoryStore, PurgeSchedule, PurgeStore, Store, Stores,
backend::{elastic::ElasticSearchStore, fs::FsStore},
backend::{elastic::ElasticSearchStore, fs::FsStore, meili::MeiliSearchStore},
};
use utils::config::{Config, cron::SimpleCron, utils::ParseValue};
@@ -211,6 +211,14 @@ impl Stores {
self.search_stores.insert(store_id, db);
}
}
"meilisearch" => {
if let Some(db) = MeiliSearchStore::open(config, prefix)
.await
.map(crate::SearchStore::from)
{
self.search_stores.insert(store_id, db);
}
}
#[cfg(feature = "redis")]
"redis" => {
if let Some(db) = crate::backend::redis::RedisStore::open(config, prefix)

View File

@@ -207,6 +207,7 @@ impl SearchStore {
_ => unreachable!(),
},
SearchStore::ElasticSearch(store) => store.query(index, filters, sort).await,
SearchStore::MeiliSearch(store) => store.query(index, filters, sort).await,
}
}
@@ -242,6 +243,11 @@ impl SearchStore {
.query(query.index, &query.filters, &query.comparators)
.await
}
SearchStore::MeiliSearch(store) => {
store
.query(query.index, &query.filters, &query.comparators)
.await
}
}
}
@@ -261,6 +267,7 @@ impl SearchStore {
store => store.index(documents).await,
},
SearchStore::ElasticSearch(store) => store.index(documents).await,
SearchStore::MeiliSearch(store) => store.index(documents).await,
}
}
@@ -280,6 +287,7 @@ impl SearchStore {
store => store.unindex(query).await.map(|_| 0),
},
SearchStore::ElasticSearch(store) => store.unindex(query).await,
SearchStore::MeiliSearch(store) => store.unindex(query).await,
}
}
@@ -321,6 +329,10 @@ impl SearchStore {
pub fn is_elasticsearch(&self) -> bool {
matches!(self, SearchStore::ElasticSearch(_))
}
pub fn is_meilisearch(&self) -> bool {
matches!(self, SearchStore::MeiliSearch(_))
}
}
impl SearchFilter {

View File

@@ -25,6 +25,8 @@ use std::{borrow::Cow, sync::Arc};
use utils::config::cron::SimpleCron;
use write::ValueClass;
use crate::backend::{elastic::ElasticSearchStore, meili::MeiliSearchStore};
pub trait Deserialize: Sized + Sync + Send {
fn deserialize(bytes: &[u8]) -> trc::Result<Self>;
fn deserialize_owned(bytes: Vec<u8>) -> trc::Result<Self> {
@@ -184,7 +186,8 @@ pub enum BlobBackend {
#[derive(Clone)]
pub enum SearchStore {
Store(Store),
ElasticSearch(Arc<backend::elastic::ElasticSearchStore>),
ElasticSearch(Arc<ElasticSearchStore>),
MeiliSearch(Arc<MeiliSearchStore>),
}
#[derive(Clone, Debug)]
@@ -280,12 +283,18 @@ impl From<backend::azure::AzureStore> for BlobStore {
}
}
impl From<backend::elastic::ElasticSearchStore> for SearchStore {
fn from(store: backend::elastic::ElasticSearchStore) -> Self {
impl From<ElasticSearchStore> for SearchStore {
fn from(store: ElasticSearchStore) -> Self {
Self::ElasticSearch(Arc::new(store))
}
}
impl From<MeiliSearchStore> for SearchStore {
fn from(store: MeiliSearchStore) -> Self {
Self::MeiliSearch(Arc::new(store))
}
}
#[cfg(feature = "redis")]
impl From<backend::redis::RedisStore> for InMemoryStore {
fn from(store: backend::redis::RedisStore) -> Self {

View File

@@ -323,3 +323,71 @@ impl ParseValue for SearchField {
}
impl Eq for SearchFilter {}
impl SearchIndex {
pub fn index_name(&self) -> &'static str {
match self {
SearchIndex::Email => "st_email",
SearchIndex::Calendar => "st_calendar",
SearchIndex::Contacts => "st_contact",
SearchIndex::File => "st_file",
SearchIndex::Tracing => "st_tracing",
SearchIndex::InMemory => unreachable!(),
}
}
}
impl SearchField {
pub fn field_name(&self) -> &'static str {
match self {
SearchField::AccountId => "acc_id",
SearchField::DocumentId => "doc_id",
SearchField::Id => "id",
SearchField::Email(field) => match field {
EmailSearchField::From => "from",
EmailSearchField::To => "to",
EmailSearchField::Cc => "cc",
EmailSearchField::Bcc => "bcc",
EmailSearchField::Subject => "subj",
EmailSearchField::Body => "body",
EmailSearchField::Attachment => "attach",
EmailSearchField::ReceivedAt => "rcvd",
EmailSearchField::SentAt => "sent",
EmailSearchField::Size => "size",
EmailSearchField::HasAttachment => "has_att",
EmailSearchField::Headers => "headers",
},
SearchField::Calendar(field) => match field {
CalendarSearchField::Title => "title",
CalendarSearchField::Description => "desc",
CalendarSearchField::Location => "loc",
CalendarSearchField::Owner => "owner",
CalendarSearchField::Attendee => "attendee",
CalendarSearchField::Start => "start",
CalendarSearchField::Uid => "uid",
},
SearchField::Contact(field) => match field {
ContactSearchField::Member => "member",
ContactSearchField::Kind => "kind",
ContactSearchField::Name => "name",
ContactSearchField::Nickname => "nick",
ContactSearchField::Organization => "org",
ContactSearchField::Email => "email",
ContactSearchField::Phone => "phone",
ContactSearchField::OnlineService => "online",
ContactSearchField::Address => "addr",
ContactSearchField::Note => "note",
ContactSearchField::Uid => "uid",
},
SearchField::File(field) => match field {
FileSearchField::Name => "name",
FileSearchField::Content => "content",
},
SearchField::Tracing(field) => match field {
TracingSearchField::EventType => "ev_type",
TracingSearchField::QueueId => "queue_id",
TracingSearchField::Keywords => "keywords",
},
}
}
}

View File

@@ -1567,6 +1567,7 @@ impl StoreEvent {
StoreEvent::CacheHit => "Cache hit",
StoreEvent::CacheStale => "Cache is stale",
StoreEvent::CacheUpdate => "Cache update",
StoreEvent::MeilisearchError => "Meilisearch error",
}
}
@@ -1608,6 +1609,7 @@ impl StoreEvent {
StoreEvent::CacheHit => "Cache entry found for the account, no update needed",
StoreEvent::CacheStale => "Cache is too old, rebuilding",
StoreEvent::CacheUpdate => "Cache updated with latest database changes",
StoreEvent::MeilisearchError => "A Meilisearch error occurred",
}
}
}

View File

@@ -34,6 +34,7 @@ impl EventType {
| StoreEvent::SqliteError
| StoreEvent::LdapError
| StoreEvent::ElasticsearchError
| StoreEvent::MeilisearchError
| StoreEvent::RedisError
| StoreEvent::S3Error
| StoreEvent::AzureError

View File

@@ -830,6 +830,7 @@ pub enum StoreEvent {
SqliteError,
LdapError,
ElasticsearchError,
MeilisearchError,
RedisError,
S3Error,
AzureError,

View File

@@ -897,6 +897,7 @@ impl EventType {
EventType::TaskQueue(TaskQueueEvent::TaskFailed) => 587,
EventType::Spam(SpamEvent::TrainStarted) => 588,
EventType::Spam(SpamEvent::ModelLoaded) => 589,
EventType::Store(StoreEvent::MeilisearchError) => 590,
}
}
@@ -1532,6 +1533,7 @@ impl EventType {
587 => Some(EventType::TaskQueue(TaskQueueEvent::TaskFailed)),
588 => Some(EventType::Spam(SpamEvent::TrainStarted)),
589 => Some(EventType::Spam(SpamEvent::ModelLoaded)),
590 => Some(EventType::Store(StoreEvent::MeilisearchError)),
_ => None,
}
}

View File

@@ -1,6 +1,6 @@
[package]
name = "tests"
version = "0.14.1"
version = "0.15.0"
edition = "2024"
[features]

View File

@@ -69,6 +69,12 @@ pub async fn search_store_destroy(store: &SearchStore) {
}
store.create_indexes(3, 0, false).await.unwrap();
}
SearchStore::MeiliSearch(store) => {
if let Err(err) = store.drop_indexes().await {
eprintln!("Failed to drop meilisearch indexes: {}", err);
}
store.create_indexes().await.unwrap();
}
}
}

View File

@@ -132,6 +132,14 @@ pub fn build_store_config(temp_dir: &str) -> String {
"false"
},
)
.replace(
"{MEILI_ENABLED}",
if fts_store != "meili" {
"true"
} else {
"false"
},
)
}
const CONFIG: &str = r#"
@@ -171,6 +179,17 @@ disable = {ELASTIC_ENABLED}
username = "elastic"
secret = "changeme"
[store."meili"]
type = "meilisearch"
url = "http://localhost:7700"
tls.allow-invalid-certs = true
disable = {MEILI_ENABLED}
[store."meili".task]
poll-interval = "100ms"
#[store."meili".auth]
#username = "meili"
#secret = "changeme"
#[store."s3"]
#type = "s3"
#access-key = "minioadmin"

View File

@@ -120,139 +120,139 @@ pub async fn test(store: SearchStore, do_insert: bool) {
println!("Running global id filtering tests...");
test_global(store.clone()).await;
if do_insert {
let filter_ids = std::env::var("QUICK_TEST").is_ok().then(|| {
let mut ids = AHashSet::new();
for &id in ALL_IDS {
ids.insert(id.to_string());
let id = id.as_bytes();
if id.last().unwrap() > &b'0' {
let mut alt_id = id.to_vec();
*alt_id.last_mut().unwrap() -= 1;
ids.insert(String::from_utf8(alt_id).unwrap());
}
if id.last().unwrap() < &b'9' {
let mut alt_id = id.to_vec();
*alt_id.last_mut().unwrap() += 1;
ids.insert(String::from_utf8(alt_id).unwrap());
let filter_ids = std::env::var("QUICK_TEST").is_ok().then(|| {
let mut ids = AHashSet::new();
for &id in ALL_IDS {
ids.insert(id.to_string());
let id = id.as_bytes();
if id.last().unwrap() > &b'0' {
let mut alt_id = id.to_vec();
*alt_id.last_mut().unwrap() -= 1;
ids.insert(String::from_utf8(alt_id).unwrap());
}
if id.last().unwrap() < &b'9' {
let mut alt_id = id.to_vec();
*alt_id.last_mut().unwrap() += 1;
ids.insert(String::from_utf8(alt_id).unwrap());
}
}
ids
});
pool.scope_fifo(|s| {
for (document_id, record) in csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(&deflate_test_resource("artwork_data.csv.gz")[..])
.records()
.enumerate()
{
let record = record.unwrap();
let documents = documents.clone();
if let Some(filter_ids) = &filter_ids {
let id = record.get(1).unwrap().to_lowercase();
if !filter_ids.contains(&id) {
continue;
}
}
ids
});
s.spawn_fifo(move |_| {
let mut document = IndexDocument::new(SearchIndex::Email)
.with_account_id(0)
.with_document_id(document_id as u32);
for (pos, field) in record.iter().enumerate() {
match FIELD_MAPPINGS[pos] {
EmailSearchField::From
| EmailSearchField::To
| EmailSearchField::Cc
| EmailSearchField::Bcc => {
document.index_text(
FIELD_MAPPINGS[pos].clone(),
&field.to_lowercase(),
Language::None,
);
}
EmailSearchField::Subject
| EmailSearchField::Body
| EmailSearchField::Attachment => {
document.index_text(
FIELD_MAPPINGS[pos].clone(),
&field
.replace(|ch: char| !ch.is_alphanumeric(), " ")
.to_lowercase(),
Language::English,
);
}
EmailSearchField::Headers => {
document.insert_key_value(
EmailSearchField::Headers,
"artist",
field.to_lowercase(),
);
}
EmailSearchField::ReceivedAt
| EmailSearchField::SentAt
| EmailSearchField::Size => {
document.index_unsigned(
FIELD_MAPPINGS[pos].clone(),
field.parse::<u64>().unwrap_or(0),
);
}
_ => {
continue;
}
};
}
pool.scope_fifo(|s| {
for (document_id, record) in csv::ReaderBuilder::new()
.has_headers(true)
.from_reader(&deflate_test_resource("artwork_data.csv.gz")[..])
.records()
.enumerate()
documents.lock().unwrap().push(document);
});
}
});
println!(
"Parsed {} entries in {} ms.",
documents.lock().unwrap().len(),
now.elapsed().as_millis()
);
let now = Instant::now();
let batches = documents.lock().unwrap().drain(..).collect::<Vec<_>>();
print!("Inserting... ",);
let mut chunks = Vec::new();
let mut chunk = Vec::new();
for document in batches {
let mut document_id = None;
let mut to_field = None;
for (key, value) in document.fields() {
if key == &SearchField::DocumentId {
if let SearchValue::Uint(id) = value {
document_id = Some(*id as u32);
}
} else if key == &SearchField::Email(EmailSearchField::To)
&& let SearchValue::Text { value, .. } = value
{
let record = record.unwrap();
let documents = documents.clone();
if let Some(filter_ids) = &filter_ids {
let id = record.get(1).unwrap().to_lowercase();
if !filter_ids.contains(&id) {
continue;
}
}
s.spawn_fifo(move |_| {
let mut document = IndexDocument::new(SearchIndex::Email)
.with_account_id(0)
.with_document_id(document_id as u32);
for (pos, field) in record.iter().enumerate() {
match FIELD_MAPPINGS[pos] {
EmailSearchField::From
| EmailSearchField::To
| EmailSearchField::Cc
| EmailSearchField::Bcc => {
document.index_text(
FIELD_MAPPINGS[pos].clone(),
&field.to_lowercase(),
Language::None,
);
}
EmailSearchField::Subject
| EmailSearchField::Body
| EmailSearchField::Attachment => {
document.index_text(
FIELD_MAPPINGS[pos].clone(),
&field
.replace(|ch: char| !ch.is_alphanumeric(), " ")
.to_lowercase(),
Language::English,
);
}
EmailSearchField::Headers => {
document.insert_key_value(
EmailSearchField::Headers,
"artist",
field.to_lowercase(),
);
}
EmailSearchField::ReceivedAt
| EmailSearchField::SentAt
| EmailSearchField::Size => {
document.index_unsigned(
FIELD_MAPPINGS[pos].clone(),
field.parse::<u64>().unwrap_or(0),
);
}
_ => {
continue;
}
};
}
documents.lock().unwrap().push(document);
});
}
});
println!(
"Parsed {} entries in {} ms.",
documents.lock().unwrap().len(),
now.elapsed().as_millis()
);
let now = Instant::now();
let batches = documents.lock().unwrap().drain(..).collect::<Vec<_>>();
print!("Inserting... ",);
let mut chunks = Vec::new();
let mut chunk = Vec::new();
for document in batches {
let mut document_id = None;
let mut to_field = None;
for (key, value) in document.fields() {
if key == &SearchField::DocumentId {
if let SearchValue::Uint(id) = value {
document_id = Some(*id as u32);
}
} else if key == &SearchField::Email(EmailSearchField::To)
&& let SearchValue::Text { value, .. } = value
{
to_field = Some(value.to_string());
}
}
let document_id = document_id.unwrap();
let to_field = to_field.unwrap();
mask.insert(document_id);
fields.insert(document_id, to_field);
chunk.push(document);
if chunk.len() == 10 {
chunks.push(chunk);
chunk = Vec::new();
to_field = Some(value.to_string());
}
}
if !chunk.is_empty() {
let document_id = document_id.unwrap();
let to_field = to_field.unwrap();
mask.insert(document_id);
fields.insert(document_id, to_field);
chunk.push(document);
if chunk.len() == 10 {
chunks.push(chunk);
chunk = Vec::new();
}
}
if !chunk.is_empty() {
chunks.push(chunk);
}
if do_insert {
let mut tasks = Vec::new();
for chunk in chunks {
let chunk_instance = Instant::now();