mirror of
https://github.com/stalwartlabs/stalwart.git
synced 2026-03-17 14:34:03 +00:00
FoundationDB search: Batch large transactions (fixes #2567)
This commit is contained in:
@@ -683,10 +683,12 @@ impl From<Rows> for Vec<u32> {
|
||||
}
|
||||
|
||||
impl Store {
|
||||
#[inline(always)]
|
||||
pub fn is_none(&self) -> bool {
|
||||
matches!(self, Self::None)
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_sql(&self) -> bool {
|
||||
match self {
|
||||
#[cfg(feature = "sqlite")]
|
||||
@@ -705,6 +707,7 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_pg_or_mysql(&self) -> bool {
|
||||
match self {
|
||||
#[cfg(feature = "mysql")]
|
||||
@@ -715,6 +718,15 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
pub fn is_foundationdb(&self) -> bool {
|
||||
match self {
|
||||
#[cfg(feature = "foundation")]
|
||||
Store::FoundationDb(_) => true,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
// SPDX-SnippetBegin
|
||||
// SPDX-FileCopyrightText: 2020 Stalwart Labs LLC <hello@stalw.art>
|
||||
// SPDX-License-Identifier: LicenseRef-SEL
|
||||
|
||||
@@ -22,6 +22,8 @@ use utils::cheeky_hash::CheekyHash;
|
||||
|
||||
impl Store {
|
||||
pub(crate) async fn index(&self, documents: Vec<IndexDocument>) -> trc::Result<()> {
|
||||
let truncate_at = if self.is_foundationdb() { 1_048_576 } else { 0 };
|
||||
|
||||
for document in documents {
|
||||
let mut batch = BatchBuilder::new();
|
||||
let index = document.index;
|
||||
@@ -63,7 +65,7 @@ impl Store {
|
||||
}
|
||||
}
|
||||
|
||||
let term_index_builder = TermIndexBuilder::build(document);
|
||||
let term_index_builder = TermIndexBuilder::build(document, truncate_at);
|
||||
if let Some(old_term_index) = old_term_index {
|
||||
let old_term_index = old_term_index
|
||||
.unarchive::<TermIndex>()
|
||||
@@ -79,9 +81,11 @@ impl Store {
|
||||
.caused_by(trc::location!())?;
|
||||
}
|
||||
|
||||
self.write(batch.build_all())
|
||||
.await
|
||||
.caused_by(trc::location!())?;
|
||||
let mut commit_points = batch.commit_points();
|
||||
for commit_point in commit_points.iter() {
|
||||
let batch = batch.build_one(commit_point);
|
||||
self.write(batch).await.caused_by(trc::location!())?;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -41,7 +41,7 @@ pub(crate) struct TermIndexBuilder {
|
||||
}
|
||||
|
||||
impl TermIndexBuilder {
|
||||
pub fn build(document: IndexDocument) -> Self {
|
||||
pub fn build(document: IndexDocument, truncate_at: usize) -> Self {
|
||||
let mut terms: CheekyBTreeMap<u32> = CheekyBTreeMap::new();
|
||||
let mut fields: Vec<SearchIndexField> = Vec::new();
|
||||
let mut account_id = None;
|
||||
@@ -79,9 +79,16 @@ impl TermIndexBuilder {
|
||||
let field = match value {
|
||||
SearchValue::Text { value, language } => {
|
||||
if field.is_text() {
|
||||
let value = if truncate_at > 0 && value.len() > truncate_at {
|
||||
let pos = value.floor_char_boundary(truncate_at);
|
||||
&value[..pos]
|
||||
} else {
|
||||
&value
|
||||
};
|
||||
|
||||
match language {
|
||||
Language::Unknown => {
|
||||
for token in WordTokenizer::new(value.as_str(), MAX_TOKEN_LENGTH) {
|
||||
for token in WordTokenizer::new(value, MAX_TOKEN_LENGTH) {
|
||||
terms
|
||||
.entry(CheekyHash::new(token.word.as_bytes()))
|
||||
.or_default()
|
||||
@@ -89,7 +96,7 @@ impl TermIndexBuilder {
|
||||
}
|
||||
}
|
||||
Language::None => {
|
||||
for token in SpaceTokenizer::new(value.as_str(), MAX_TOKEN_LENGTH) {
|
||||
for token in SpaceTokenizer::new(value, MAX_TOKEN_LENGTH) {
|
||||
terms
|
||||
.entry(CheekyHash::new(token.as_bytes()))
|
||||
.or_default()
|
||||
@@ -97,7 +104,7 @@ impl TermIndexBuilder {
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
for token in Stemmer::new(&value, language, MAX_TOKEN_LENGTH) {
|
||||
for token in Stemmer::new(value, language, MAX_TOKEN_LENGTH) {
|
||||
terms
|
||||
.entry(CheekyHash::new(token.word.as_bytes()))
|
||||
.or_default()
|
||||
@@ -195,41 +202,47 @@ impl TermIndex {
|
||||
id: SearchIndexId,
|
||||
) -> trc::Result<()> {
|
||||
let archive = Archiver::new(self);
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}),
|
||||
archive.serialize()?,
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}),
|
||||
archive.serialize()?,
|
||||
)
|
||||
.commit_point();
|
||||
|
||||
for term in archive.inner.terms {
|
||||
let mut fields = term.fields;
|
||||
while let Some(field) = fields.bit_pop() {
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Term {
|
||||
hash: term.hash,
|
||||
field,
|
||||
},
|
||||
}),
|
||||
vec![],
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Term {
|
||||
hash: term.hash,
|
||||
field,
|
||||
},
|
||||
}),
|
||||
vec![],
|
||||
)
|
||||
.commit_point();
|
||||
}
|
||||
}
|
||||
|
||||
for field in archive.inner.fields {
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}),
|
||||
vec![],
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}),
|
||||
vec![],
|
||||
)
|
||||
.commit_point();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -243,14 +256,16 @@ impl TermIndex {
|
||||
old_term: &ArchivedTermIndex,
|
||||
) -> trc::Result<()> {
|
||||
let archive = Archiver::new(self);
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}),
|
||||
archive.serialize()?,
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}),
|
||||
archive.serialize()?,
|
||||
)
|
||||
.commit_point();
|
||||
|
||||
let mut old_terms = AHashSet::with_capacity(old_term.terms.len());
|
||||
let mut old_fields = AHashSet::with_capacity(old_term.fields.len());
|
||||
@@ -279,37 +294,45 @@ impl TermIndex {
|
||||
};
|
||||
|
||||
if !old_terms.remove(&typ) {
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass { index, id, typ }),
|
||||
vec![],
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass { index, id, typ }),
|
||||
vec![],
|
||||
)
|
||||
.commit_point();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for field in archive.inner.fields {
|
||||
if !old_fields.remove(&field) {
|
||||
batch.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}),
|
||||
vec![],
|
||||
);
|
||||
batch
|
||||
.set(
|
||||
ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}),
|
||||
vec![],
|
||||
)
|
||||
.commit_point();
|
||||
}
|
||||
}
|
||||
|
||||
for typ in old_terms {
|
||||
batch.clear(ValueClass::SearchIndex(SearchIndexClass { index, id, typ }));
|
||||
batch
|
||||
.clear(ValueClass::SearchIndex(SearchIndexClass { index, id, typ }))
|
||||
.commit_point();
|
||||
}
|
||||
|
||||
for field in old_fields {
|
||||
batch.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}));
|
||||
batch
|
||||
.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index { field },
|
||||
}))
|
||||
.commit_point();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -318,37 +341,43 @@ impl TermIndex {
|
||||
|
||||
impl ArchivedTermIndex {
|
||||
pub fn delete_index(&self, batch: &mut BatchBuilder, index: SearchIndex, id: SearchIndexId) {
|
||||
batch.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}));
|
||||
batch
|
||||
.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Document,
|
||||
}))
|
||||
.commit_point();
|
||||
|
||||
for term in self.terms.iter() {
|
||||
let mut fields = term.fields.to_native();
|
||||
while let Some(field) = fields.bit_pop() {
|
||||
batch.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Term {
|
||||
hash: term.hash.to_native(),
|
||||
field,
|
||||
},
|
||||
}));
|
||||
batch
|
||||
.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Term {
|
||||
hash: term.hash.to_native(),
|
||||
field,
|
||||
},
|
||||
}))
|
||||
.commit_point();
|
||||
}
|
||||
}
|
||||
|
||||
for field in self.fields.iter() {
|
||||
batch.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index {
|
||||
field: SearchIndexField {
|
||||
field_id: field.field_id,
|
||||
data: field.data.to_vec(),
|
||||
batch
|
||||
.clear(ValueClass::SearchIndex(SearchIndexClass {
|
||||
index,
|
||||
id,
|
||||
typ: SearchIndexType::Index {
|
||||
field: SearchIndexField {
|
||||
field_id: field.field_id,
|
||||
data: field.data.to_vec(),
|
||||
},
|
||||
},
|
||||
},
|
||||
}));
|
||||
}))
|
||||
.commit_point();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user